From ef63feb5b97cac2771ff15be113eb629eb4ad498 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 18 Mar 2020 18:42:15 +0000 Subject: [PATCH 1/2] fix: fix repartition semantics Fixes: https://github.com/confluentinc/ksql/issues/4749 ##### Background This change fixes an issue with our repartition semantics. Old style query semantics for partition by are broken: S1: ROWKEY => B, C (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important). ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` S2: ROWKEY => B, C As you can see the schema of S2 is still the same. However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B. This loss of data on a `SELECT * .. PARTITION BY` needed fixing. Secondly, with new primitive key [work to remove the restriction on key column naming](https://github.com/confluentinc/ksql/issues/3536), the same query semantics do not work. e.g. S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` S2: B => B, C This fails as the `B` value column clashes with the `B` key column! ##### The fix This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example, S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` Results in the schema: S2: B => C, A. If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example, S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT); ``` Results in the schema: S2: KSQL_COL_0 => B, C, A. [This github issue](https://github.com/confluentinc/ksql/issues/4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned. --- .../io/confluent/ksql/name/ColumnNames.java | 63 +++- .../confluent/ksql/name/ColumnNamesTest.java | 71 +++++ .../ksql/planner/LogicalPlanner.java | 26 +- .../ksql/planner/plan/RepartitionNode.java | 20 ++ .../ksql/structured/SchemaKStream.java | 12 +- .../io/confluent/ksql/util/PlanSummary.java | 2 + .../planner/plan/RepartitionNodeTest.java | 125 ++++++++ .../ksql/structured/SchemaKStreamTest.java | 3 +- .../execution/builder/KsqlQueryBuilder.java | 7 + .../ksql/execution/plan/ExecutionStep.java | 3 +- .../ksql/execution/plan/PlanBuilder.java | 2 + .../execution/plan/StreamSelectKeyV1.java | 88 ++++++ .../execution/plan/StreamSelectKeyTest.java | 1 + .../execution/plan/StreamSelectKeyV1Test.java | 52 ++++ .../6.0.0_1584548264774/plan.json | 251 ++++++++++++++++ .../6.0.0_1584548264774/spec.json | 34 +++ .../6.0.0_1584548264774/topology | 69 +++++ .../query-validation-tests/joins.json | 112 +++++++- .../query-validation-tests/partition-by.json | 116 ++++++++ .../ksql/rest/server/KsqlPlanSchemaTest.java | 2 +- .../resources/ksql-plan-schema/schema.json | 30 +- .../streams/ExecutionStepFactory.java | 10 + .../ksql/execution/streams/KSPlanBuilder.java | 12 +- .../execution/streams/PartitionByParams.java | 54 ++++ .../streams/PartitionByParamsFactory.java | 190 +++++++++++++ .../execution/streams/StepSchemaResolver.java | 17 +- .../streams/StreamSelectKeyBuilder.java | 59 ++-- .../streams/StreamSelectKeyBuilderV1.java | 81 ++++++ .../streams/PartitionByParamsFactoryTest.java | 269 ++++++++++++++++++ .../streams/StepSchemaResolverTest.java | 29 +- .../streams/StreamSelectKeyBuilderTest.java | 103 ++++--- .../streams/StreamSelectKeyBuilderV1Test.java | 259 +++++++++++++++++ 32 files changed, 2044 insertions(+), 128 deletions(-) create mode 100644 ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java create mode 100644 ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java create mode 100644 ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java b/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java index 54f38efcdb96..dd88e59919c6 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java @@ -1,8 +1,8 @@ /* * Copyright 2020 Confluent Inc. * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the * License at * * http://www.confluent.io/confluent-community-license @@ -15,12 +15,21 @@ package io.confluent.ksql.name; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.OptionalInt; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public final class ColumnNames { private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_"; private static final String GENERATED_ALIAS_PREFIX = "KSQL_COL_"; private static final String SYNTHESISED_COLUMN_PREFIX = "KSQL_SYNTH_"; + private static final Pattern GENERATED_ALIAS_PATTERN = Pattern + .compile(GENERATED_ALIAS_PREFIX + "(\\d+)"); + private ColumnNames() { } @@ -32,24 +41,24 @@ public static ColumnName aggregateColumn(final int idx) { } /** - * Where the user hasn't specified an alias for an expression in a SELECT we generate them - * using this method. This value is exposed to the user in the output schema + * Where the user hasn't specified an alias for an expression in a SELECT we generate them using + * this method. This value is exposed to the user in the output schema */ public static ColumnName generatedColumnAlias(final int idx) { return ColumnName.of(GENERATED_ALIAS_PREFIX + idx); } /** - * Used to generate a column name in an intermediate schema, e.g. for a column to hold - * values of a table function. These are never exposed to the user + * Used to generate a column name in an intermediate schema, e.g. for a column to hold values of a + * table function. These are never exposed to the user */ public static ColumnName synthesisedSchemaColumn(final int idx) { return ColumnName.of(SYNTHESISED_COLUMN_PREFIX + idx); } /** - * Used to generate a column alias for a join where the a column with this name exists - * in both of the sources. + * Used to generate a column alias for a join where the a column with this name exists in both of + * the sources. */ public static ColumnName generatedJoinColumnAlias( final SourceName sourceName, @@ -61,4 +70,42 @@ public static ColumnName generatedJoinColumnAlias( public static boolean isAggregate(final ColumnName name) { return name.text().startsWith(AGGREGATE_COLUMN_PREFIX); } + + /** + * Determines the next unique column alias. + * + *

Finds any existing {@code KSQL_COL_x} column names in the supplied {@code sourceSchema} to + * ensure the returned generated column name is unique. + * + * @param sourceSchema the source schema. + * @return a column name in the form {@code KSQL_COL_x} which does not clash with source schema. + */ + public static ColumnName nextGeneratedColumnAlias(final LogicalSchema sourceSchema) { + final int maxExistingIdx = maxGeneratedAliasIndex(sourceSchema); + return generatedColumnAlias(maxExistingIdx + 1); + } + + /** + * Determines the highest index of generated column names like {@code KSQL_COL_x} in the supplied + * {@code sourceSchema}. + * + * @param sourceSchema the schema. + * @return the highest index or {@code -1} + */ + private static int maxGeneratedAliasIndex(final LogicalSchema sourceSchema) { + return sourceSchema.columns().stream() + .map(Column::name) + .map(ColumnNames::extractGeneratedAliasIndex) + .filter(OptionalInt::isPresent) + .mapToInt(OptionalInt::getAsInt) + .max() + .orElse(-1); + } + + private static OptionalInt extractGeneratedAliasIndex(final ColumnName columnName) { + final Matcher matcher = GENERATED_ALIAS_PATTERN.matcher(columnName.text()); + return matcher.matches() + ? OptionalInt.of(Integer.parseInt(matcher.group(1))) + : OptionalInt.empty(); + } } diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java new file mode 100644 index 000000000000..89aa4d757871 --- /dev/null +++ b/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.name; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.junit.Test; + +public class ColumnNamesTest { + + @Test + public void shouldGenerateUniqueAliasesStartingAtZero() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .build(); + + // When: + final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema); + + // Then: + assertThat(result, is(ColumnName.of("KSQL_COL_0"))); + } + + @Test + public void shouldGenerateUniqueAliasesTakingAnyKeyColumnsIntoAccount() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(ColumnName.of("Fred"), SqlTypes.STRING) + .keyColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING) + .keyColumn(ColumnName.of("George"), SqlTypes.STRING) + .build(); + + // When: + final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema); + + // Then: + assertThat(result, is(ColumnName.of("KSQL_COL_2"))); + } + + @Test + public void shouldGenerateUniqueAliasesTakingAnyValueColumnsIntoAccount() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(ColumnName.of("Fred"), SqlTypes.STRING) + .valueColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING) + .valueColumn(ColumnName.of("George"), SqlTypes.STRING) + .build(); + + // When: + final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema); + + // Then: + assertThat(result, is(ColumnName.of("KSQL_COL_2"))); + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 47d6dd2eddc5..69201a613165 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -31,6 +31,7 @@ import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; import io.confluent.ksql.execution.plan.SelectExpression; +import io.confluent.ksql.execution.streams.PartitionByParamsFactory; import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.execution.util.ExpressionTypeManager; @@ -552,16 +553,25 @@ private LogicalSchema buildRepartitionedSchema( return sourceSchema; } - final ExpressionTypeManager typeManager = - new ExpressionTypeManager(sourceSchema, functionRegistry); + if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { + final ExpressionTypeManager expressionTypeManager = + new ExpressionTypeManager(sourceSchema, functionRegistry); - final SqlType keyType = typeManager.getExpressionSqlType(partitionBy); + final SqlType keyType = expressionTypeManager + .getExpressionSqlType(partitionBy); - return LogicalSchema.builder() - .withRowTime() - .keyColumn(SchemaUtil.ROWKEY_NAME, keyType) - .valueColumns(sourceSchema.value()) - .build(); + return LogicalSchema.builder() + .withRowTime() + .keyColumn(SchemaUtil.ROWKEY_NAME, keyType) + .valueColumns(sourceSchema.value()) + .build(); + } + + return PartitionByParamsFactory.buildSchema( + sourceSchema, + partitionBy, + functionRegistry + ); } private static boolean exactlyMatchesKeyColumns( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java index d6d168c47473..ed98fc10a4d6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java @@ -23,10 +23,15 @@ import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.metastore.model.KeyField; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; @Immutable public class RepartitionNode extends PlanNode { @@ -78,4 +83,19 @@ public Expression getPartitionBy() { public R accept(final PlanVisitor visitor, final C context) { return visitor.visitRepartition(this, context); } + + @Override + public Stream resolveSelectStar( + final Optional sourceName, + final boolean valueOnly + ) { + final boolean sourceNameMatches = !sourceName.isPresent() || sourceName.equals(getSourceName()); + if (sourceNameMatches && valueOnly) { + // Override set of value columns to take into account the repartition: + return getSchema().withoutMetaAndKeyColsInValue().value().stream().map(Column::name); + } + + // Set of all columns not changed by a repartition: + return super.resolveSelectStar(sourceName, valueOnly); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index f31c107274ae..a20790af504a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -34,7 +34,6 @@ import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamSelect; -import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamStreamJoin; import io.confluent.ksql.execution.plan.StreamTableJoin; @@ -191,8 +190,6 @@ KeyField findKeyField(final List selectExpressions) { final Optional filtered = found // System columns can not be key fields: .filter(f -> !SchemaUtil.isSystemColumn(f.name())) - // Key columns can not be key fields: - .filter(f -> !schema.isKeyColumn(f.name())) .map(Column::name); return KeyField.of(filtered); @@ -338,11 +335,10 @@ public SchemaKStream selectKey( + "See https://github.com/confluentinc/ksql/issues/4385."); } - final StreamSelectKey step = ExecutionStepFactory.streamSelectKey( - contextStacker, - sourceStep, - keyExpression - ); + final ExecutionStep> step = ksqlConfig + .getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED) + ? ExecutionStepFactory.streamSelectKey(contextStacker, sourceStep, keyExpression) + : ExecutionStepFactory.streamSelectKeyV1(contextStacker, sourceStep, keyExpression); return new SchemaKStream<>( step, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java index ba54a44ad0da..ebd64b60f626 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamStreamJoin; @@ -67,6 +68,7 @@ public class PlanSummary { .put(StreamFlatMap.class, "FLAT_MAP") .put(StreamGroupBy.class, "GROUP_BY") .put(StreamSelect.class, "PROJECT") + .put(StreamSelectKeyV1.class, "REKEY") .put(StreamSelectKey.class, "REKEY") .put(StreamSink.class, "SINK") .put(StreamSource.class, "SOURCE") diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java new file mode 100644 index 000000000000..19653cc426f8 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.metastore.model.DataSource.DataSourceType; +import io.confluent.ksql.metastore.model.KeyField; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.SchemaUtil; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class RepartitionNodeTest { + + private static final PlanNodeId PLAN_ID = new PlanNodeId("Blah") ; + private static final SourceName MATCHING_SOURCE_NAME = SourceName.of("S1"); + private static final ColumnName K0 = ColumnName.of("K"); + private static final ColumnName V0 = ColumnName.of("V0"); + private static final ColumnName V1 = ColumnName.of("V1"); + private static final ColumnName V2 = ColumnName.of("V2"); + + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .withRowTime() + .keyColumn(K0, SqlTypes.DOUBLE) + .valueColumn(V0, SqlTypes.DOUBLE) + .valueColumn(V1, SqlTypes.DOUBLE) + .valueColumn(V2, SqlTypes.DOUBLE) + .valueColumn(K0, SqlTypes.DOUBLE) + .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) + .build(); + + private static final List PARENT_COL_NAMES = ImmutableList.of( + ColumnName.of("this"), + ColumnName.of("that") + ); + + @Mock + private PlanNode parent; + @Mock + private Expression partitionBy; + private RepartitionNode repartitionNode; + + @Before + public void setUp() { + when(parent.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); + when(parent.getSourceName()).thenReturn(Optional.of(MATCHING_SOURCE_NAME)); + when(parent.resolveSelectStar(any(), anyBoolean())).thenReturn(PARENT_COL_NAMES.stream()); + + repartitionNode = new RepartitionNode(PLAN_ID, parent, SCHEMA, partitionBy, KeyField.none()); + } + + @Test + public void shouldResolveSelectStarIfSourceMatchesAndValuesOnly() { + // When: + final Stream result = repartitionNode.resolveSelectStar( + Optional.of(MATCHING_SOURCE_NAME), + true + ); + + // Then: + final List names = result.collect(Collectors.toList()); + assertThat(names, contains(V0, V1, V2)); + } + + @Test + public void shouldResolveSelectStarIfSourceNotProvidedAndValuesOnly() { + // When: + final Stream result = repartitionNode.resolveSelectStar( + Optional.empty(), + true + ); + + // Then: + final List names = result.collect(Collectors.toList()); + assertThat(names, contains(V0, V1, V2)); + } + + @Test + public void shouldPassResolveSelectStarToParentIfNotValuesOnly() { + // When: + final Stream result = repartitionNode.resolveSelectStar( + Optional.empty(), + false + ); + + // Then: + final List names = result.collect(Collectors.toList()); + assertThat(names, is(PARENT_COL_NAMES)); + + verify(parent).resolveSelectStar(Optional.empty(), false); + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 20d3488c5752..3ad6e9675df5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -76,7 +76,6 @@ @RunWith(MockitoJUnitRunner.class) public class SchemaKStreamTest { - private static final SourceName TEST1 = SourceName.of("TEST1"); private static final Expression COL1 = new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")); @@ -513,7 +512,7 @@ public void shouldBuildStepForSelectKey() { assertThat( rekeyedSchemaKStream.getSourceStep(), equalTo( - ExecutionStepFactory.streamSelectKey( + ExecutionStepFactory.streamSelectKeyV1( childContextStacker, initialSchemaKStream.getSourceStep(), new UnqualifiedColumnReferenceExp(ColumnName.of("COL1")) diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java index f29fe7504f9a..33bb2f1ea65c 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java @@ -23,6 +23,7 @@ import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -98,6 +99,12 @@ public ProcessingLogContext getProcessingLogContext() { return processingLogContext; } + public ProcessingLogger getProcessingLogger(final QueryContext queryContext) { + return processingLogContext + .getLoggerFactory() + .getLogger(QueryLoggerUtil.queryLoggerName(queryId, queryContext)); + } + public ServiceContext getServiceContext() { return serviceContext; } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java index 44bcab20f2c6..bd08466c8f0b 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java @@ -33,7 +33,8 @@ @Type(value = StreamGroupBy.class, name = "streamGroupByV1"), @Type(value = StreamGroupByKey.class, name = "streamGroupByKeyV1"), @Type(value = StreamSelect.class, name = "streamSelectV1"), - @Type(value = StreamSelectKey.class, name = "streamSelectKeyV1"), + @Type(value = StreamSelectKeyV1.class, name = "streamSelectKeyV1"), + @Type(value = StreamSelectKey.class, name = "streamSelectKeyV2"), @Type(value = StreamSink.class, name = "streamSinkV1"), @Type(value = StreamSource.class, name = "streamSourceV1"), @Type(value = WindowedStreamSource.class, name = "windowedStreamSourceV1"), diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java index 81c1ca94cb0e..c9510a304e44 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java @@ -37,6 +37,8 @@ public interface PlanBuilder { KStreamHolder visitFlatMap(StreamFlatMap streamFlatMap); + KStreamHolder visitStreamSelectKey(StreamSelectKeyV1 streamSelectKey); + KStreamHolder visitStreamSelectKey(StreamSelectKey streamSelectKey); KStreamHolder visitStreamSink(StreamSink streamSink); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java new file mode 100644 index 000000000000..f08007d030c9 --- /dev/null +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.testing.EffectivelyImmutable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.connect.data.Struct; + +@Immutable +public class StreamSelectKeyV1 implements ExecutionStep> { + + private final ExecutionStepPropertiesV1 properties; + private final Expression keyExpression; + @EffectivelyImmutable + private final ExecutionStep> source; + + public StreamSelectKeyV1( + @JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props, + @JsonProperty(value = "source", required = true) final + ExecutionStep> source, + @JsonProperty(value = "keyExpression", required = true) final Expression keyExpression + ) { + this.properties = Objects.requireNonNull(props, "props"); + this.source = Objects.requireNonNull(source, "source"); + this.keyExpression = Objects.requireNonNull(keyExpression, "keyExpression"); + } + + @Override + public ExecutionStepPropertiesV1 getProperties() { + return properties; + } + + @Override + @JsonIgnore + public List> getSources() { + return Collections.singletonList(source); + } + + public Expression getKeyExpression() { + return keyExpression; + } + + public ExecutionStep> getSource() { + return source; + } + + @Override + public KStreamHolder build(final PlanBuilder builder) { + return builder.visitStreamSelectKey(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamSelectKeyV1 that = (StreamSelectKeyV1) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source); + } + + @Override + public int hashCode() { + return Objects.hash(properties, source); + } +} diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java index d69f712cf060..8c508bd96c51 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java @@ -37,6 +37,7 @@ public class StreamSelectKeyTest { @Mock private Expression expression2; + @SuppressWarnings("UnstableApiUsage") @Test public void shouldImplementEquals() { new EqualsTester() diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java new file mode 100644 index 000000000000..54debe4640fd --- /dev/null +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.testing.EqualsTester; +import io.confluent.ksql.execution.expression.tree.Expression; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StreamSelectKeyV1Test { + @Mock + private ExecutionStepPropertiesV1 properties1; + @Mock + private ExecutionStepPropertiesV1 properties2; + @Mock + private ExecutionStep> source1; + @Mock + private ExecutionStep> source2; + @Mock + private Expression expression1; + @Mock + private Expression expression2; + + @SuppressWarnings("UnstableApiUsage") + @Test + public void shouldImplementEquals() { + new EqualsTester() + .addEqualityGroup( + new StreamSelectKeyV1(properties1, source1, expression1), + new StreamSelectKeyV1(properties1, source1, expression1)) + .addEqualityGroup(new StreamSelectKeyV1(properties2, source1, expression1)) + .addEqualityGroup(new StreamSelectKeyV1(properties1, source2, expression1)) + .addEqualityGroup(new StreamSelectKeyV1(properties1, source1, expression2)); + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json new file mode 100644 index 000000000000..afbb63470573 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json @@ -0,0 +1,251 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM L (ID DOUBLE) WITH (KAFKA_TOPIC='left_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "L", + "schema" : "`ROWKEY` STRING KEY, `ID` DOUBLE", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "left_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM R (ID INTEGER) WITH (KAFKA_TOPIC='right_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "R", + "schema" : "`ROWKEY` STRING KEY, `ID` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "right_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT L.ID L_ID\nFROM L L\nINNER JOIN R R WITHIN 30 SECONDS ON ((CAST(L.ID AS INTEGER) = R.ID))\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` INTEGER KEY, `L_ID` DOUBLE", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "R", "L" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamStreamJoinV1", + "properties" : { + "queryContext" : "Join" + }, + "joinType" : "INNER", + "leftInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "rightInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "leftSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasLeft" + }, + "source" : { + "@type" : "streamSelectKeyV1", + "properties" : { + "queryContext" : "LeftSourceKeyed" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_Left/Source" + }, + "topicName" : "left_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `ID` DOUBLE" + }, + "keyExpression" : "CAST(ID AS INTEGER)" + }, + "selectExpressions" : [ "ID AS L_ID", "ROWTIME AS L_ROWTIME", "ROWKEY AS L_ROWKEY" ] + }, + "rightSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasRight" + }, + "source" : { + "@type" : "streamSelectKeyV1", + "properties" : { + "queryContext" : "RightSourceKeyed" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_Right/Source" + }, + "topicName" : "right_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `ID` INTEGER" + }, + "keyExpression" : "ID" + }, + "selectExpressions" : [ "ID AS R_ID", "ROWTIME AS R_ROWTIME", "ROWKEY AS R_ROWKEY" ] + }, + "beforeMillis" : 30.000000000, + "afterMillis" : 30.000000000 + }, + "selectExpressions" : [ "L_ID AS L_ID" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT", + "timestampColumn" : null + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent6364076579994155926", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json new file mode 100644 index 000000000000..a40945cb1e35 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json @@ -0,0 +1,34 @@ +{ + "version" : "6.0.0", + "timestamp" : 1584548264774, + "schemas" : { + "CSAS_OUTPUT_0.KafkaTopic_Left.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Left" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Right" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "left_topic", + "key" : "", + "value" : { + "id" : 1.0 + }, + "timestamp" : 10 + }, { + "topic" : "right_topic", + "key" : "", + "value" : { + "id" : 1 + }, + "timestamp" : 11 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "L_ID" : 1.0 + }, + "timestamp" : 11 + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology new file mode 100644 index 000000000000..239bf5b7d656 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology @@ -0,0 +1,69 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [left_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> KSTREAM-FILTER-0000000002 + <-- KSTREAM-SOURCE-0000000000 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> PrependAliasLeft + <-- KSTREAM-FILTER-0000000002 + Processor: PrependAliasLeft (stores: []) + --> Join-left-repartition-filter + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: Join-left-repartition-filter (stores: []) + --> Join-left-repartition-sink + <-- PrependAliasLeft + Sink: Join-left-repartition-sink (topic: Join-left-repartition) + <-- Join-left-repartition-filter + + Sub-topology: 1 + Source: KSTREAM-SOURCE-0000000005 (topics: [right_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000006 + Processor: KSTREAM-TRANSFORMVALUES-0000000006 (stores: []) + --> KSTREAM-FILTER-0000000007 + <-- KSTREAM-SOURCE-0000000005 + Processor: KSTREAM-FILTER-0000000007 (stores: []) + --> KSTREAM-KEY-SELECT-0000000008 + <-- KSTREAM-TRANSFORMVALUES-0000000006 + Processor: KSTREAM-KEY-SELECT-0000000008 (stores: []) + --> PrependAliasRight + <-- KSTREAM-FILTER-0000000007 + Processor: PrependAliasRight (stores: []) + --> Join-right-repartition-filter + <-- KSTREAM-KEY-SELECT-0000000008 + Processor: Join-right-repartition-filter (stores: []) + --> Join-right-repartition-sink + <-- PrependAliasRight + Sink: Join-right-repartition-sink (topic: Join-right-repartition) + <-- Join-right-repartition-filter + + Sub-topology: 2 + Source: Join-left-repartition-source (topics: [Join-left-repartition]) + --> Join-this-windowed + Source: Join-right-repartition-source (topics: [Join-right-repartition]) + --> Join-other-windowed + Processor: Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store]) + --> Join-other-join + <-- Join-right-repartition-source + Processor: Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store]) + --> Join-this-join + <-- Join-left-repartition-source + Processor: Join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store]) + --> Join-merge + <-- Join-other-windowed + Processor: Join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store]) + --> Join-merge + <-- Join-this-windowed + Processor: Join-merge (stores: []) + --> Project + <-- Join-this-join, Join-other-join + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000022 + <-- Join-merge + Sink: KSTREAM-SINK-0000000022 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json index bc1316a60260..2714c0991ad0 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -1,5 +1,57 @@ { "tests": [ + { + "name": "matching columns in both sides = select *", + "statements": [ + "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", + "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 10}, + {"topic": "RIGHT", "key": 0, "value": {"B": -1, "C": -2}, "timestamp": 11} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 10, "R_ROWTIME": 11, "L_A": 0, "R_A": 0, "L_B": 1, "R_B": -1, "L_C": 2, "R_C": -2}, "timestamp": 11} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "A INT KEY, L_ROWTIME BIGINT, L_A INT, L_B INT, L_C INT, R_ROWTIME BIGINT, R_A INT, R_B INT, R_C INT"} + ], + "topics": { + "blacklist": ".*-repartition" + } + } + }, + { + "name": "matching columns in both sides = select left.* and right.*", + "statements": [ + "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", + "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT l.*, r.* FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 10}, + {"topic": "RIGHT", "key": 0, "value": {"B": -1, "C": -2}, "timestamp": 11} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 10, "R_ROWTIME": 11, "L_A": 0, "R_A": 0, "L_B": 1, "R_B": -1, "L_C": 2, "R_C": -2}, "timestamp": 11} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "A INT KEY, L_ROWTIME BIGINT, L_A INT, L_B INT, L_C INT, R_ROWTIME BIGINT, R_A INT, R_B INT, R_C INT"} + ], + "topics": { + "blacklist": ".*-repartition" + } + } + }, { "name": "stream-stream left join", "format": ["AVRO", "JSON"], @@ -104,7 +156,7 @@ } }, { - "name": "stream-stream left join with rowkey - rekey", + "name": "stream-stream left join with key in projection - rekey", "format": ["AVRO", "JSON"], "statements": [ "CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", @@ -135,12 +187,12 @@ ], "post": { "sources": [ - {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} + {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} ] } }, { - "name": "stream-stream left join with rowkey - rekey", + "name": "stream-stream left join with key in projection - rekey", "format": ["PROTOBUF"], "statements": [ "CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", @@ -171,7 +223,7 @@ ], "post": { "sources": [ - {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, T_K STRING, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} + {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, T_K STRING, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} ] } }, @@ -223,7 +275,7 @@ ], "post": { "sources": [ - {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} + {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} ] } }, @@ -275,7 +327,7 @@ ], "post": { "sources": [ - {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} + {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"} ] } }, @@ -1216,7 +1268,7 @@ {"topic": "right_topic", "key": 1, "value": {"x": 3}, "timestamp": 10} ], "outputs": [ - {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_X": 3, "TT_ROWTIME": 10, "TT_ID": 1}, "timestamp": 10}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_X": 3, "TT_ROWTIME": 10, "TT_ID": 1, "TT_KSQL_COL_0": 1}, "timestamp": 10}, {"topic": "OUTPUT", "key": 1, "value": {"T_X": 2}, "timestamp": 10} ] }, @@ -1238,6 +1290,25 @@ {"topic": "OUTPUT", "key": 1, "value": {"L_X": 2}, "timestamp": 11} ] }, + { + "name": "stream-stream join on expression where schema contains KSQL_COL_xx column names", + "statements": [ + "CREATE STREAM TEST1 (KSQL_COL_0 bigint KEY, KSQL_COL_2 bigint) WITH (kafka_topic='left_topic', value_format='JSON');", + "CREATE STREAM TEST2 (KSQL_COL_1 int KEY, KSQL_COL_3 int) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT t.KSQL_COL_2 FROM test1 t JOIN test2 tt WITHIN 30 seconds ON t.KSQL_COL_0 = CAST(tt.KSQL_COL_1 AS BIGINT);" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "left_topic", "key": 1, "value": {"KSQL_COL_2": 2}, "timestamp": 10}, + {"topic": "right_topic", "key": 1, "value": {"KSQL_COL_3": 3}, "timestamp": 10} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_KSQL_COL_3": 3, "TT_ROWTIME": 10, "TT_KSQL_COL_1": 1, "TT_KSQL_COL_4": 1}, "timestamp": 10}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_2": 2}, "timestamp": 10} + ] + }, { "name": "stream-stream join - contains subscript", "statements": [ @@ -1420,7 +1491,7 @@ "name": "OUTPUT", "type": "stream", "keyFormat": {"format": "KAFKA"}, - "schema": "ROWKEY BIGINT KEY, S_ROWTIME BIGINT, S_K STRING, S_SF BIGINT, T_ROWTIME BIGINT, T_ID BIGINT, T_TF INT" + "schema": "SF BIGINT KEY, S_ROWTIME BIGINT, S_K STRING, S_SF BIGINT, T_ROWTIME BIGINT, T_ID BIGINT, T_TF INT" } ] } @@ -1460,7 +1531,7 @@ ], "post": { "sources": [ - {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, L_ID STRING, L1 INT, R1 INT"} + {"name": "OUTPUT", "type": "stream", "schema": "L0 INT KEY, L_ID STRING, L1 INT, R1 INT"} ] } }, @@ -1484,7 +1555,7 @@ ], "post": { "sources": [ - {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY BIGINT KEY, L_ID STRING, L1 INT, R1 INT"} + {"name": "OUTPUT", "type": "stream", "schema": "L0 BIGINT KEY, L_ID STRING, L1 INT, R1 INT"} ] } }, @@ -1508,7 +1579,7 @@ ], "post": { "sources": [ - {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY DOUBLE KEY, L_ID STRING, L1 INT, R1 INT"} + {"name": "OUTPUT", "type": "stream", "schema": "L0 DOUBLE KEY, L_ID STRING, L1 INT, R1 INT"} ] } }, @@ -1532,7 +1603,7 @@ ], "post": { "sources": [ - {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, L_ID STRING, L1 INT, R1 INT"} + {"name": "OUTPUT", "type": "stream", "schema": "L0 STRING KEY, L_ID STRING, L1 INT, R1 INT"} ] } }, @@ -1640,7 +1711,7 @@ } }, { - "name": "windowed vand non-windowed - INT", + "name": "windowed and non-windowed - INT", "statements": [ "CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');", "CREATE STREAM S2 (ID INT KEY, V bigint) WITH (kafka_topic='right_topic', value_format='JSON');", @@ -3527,6 +3598,21 @@ {"topic": "OUTPUT", "key": 1, "value": {"L_ID": 1}, "timestamp": 11} ] }, + { + "name": "stream stream join - contains CAST double to int - reversed", + "statements": [ + "CREATE STREAM L (ID DOUBLE) WITH (kafka_topic='left_topic', value_format='JSON');", + "CREATE STREAM R (ID INT) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT L.ID FROM L JOIN R WITHIN 30 seconds ON CAST(L.id AS INT) = R.id;" + ], + "inputs": [ + {"topic": "left_topic", "value": {"id": 1.0}, "timestamp": 10}, + {"topic": "right_topic", "value": {"id": 1}, "timestamp": 11} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"L_ID": 1.0}, "timestamp": 11} + ] + }, { "name": "stream stream join - contains subscript", "statements": [ diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index 5879cef22ba7..be273c741e9d 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -1,5 +1,121 @@ { "tests": [ + { + "name": "only key column - select star", + "statements": [ + "CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');", + "CREATE STREAM OUTPUT AS select * from INPUT partition by ID;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob"}}], + "outputs": [{"topic": "OUTPUT", "key": 10, "value": {"NAME": "bob"}}], + "post": { + "topics": { + "blacklist": ".*-repartition" + }, + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, NAME STRING"} + ] + } + }, + { + "name": "single value column - select star", + "statements": [ + "CREATE STREAM INPUT (ID INT KEY, NAME STRING, AGE INT) with (kafka_topic='input', value_format='JSON');", + "CREATE STREAM OUTPUT AS select * from INPUT partition by NAME;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob", "AGE": 22}}], + "outputs": [{"topic": "OUTPUT", "key": "bob", "value": {"ID": 10, "AGE": 22}}], + "post": { + "topics": { + "blacklist": ".*-repartition" + }, + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, AGE INT, ID INT"} + ] + } + }, + { + "name": "only key column - select columns", + "statements": [ + "CREATE STREAM INPUT (ID INT KEY, NAME STRING, OTHER INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT NAME FROM INPUT PARTITION BY ID;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "key": 11, "value": {"name": "a"}}, + {"topic": "test_topic", "key": 10, "value": {"name": "b"}}, + {"topic": "test_topic", "key": 11, "value": {"name": "c"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 11, "value": {"NAME": "a"}}, + {"topic": "OUTPUT", "key": 10, "value": {"NAME": "b"}}, + {"topic": "OUTPUT", "key": 11, "value": {"NAME": "c"}} + ], + "post": { + "sources": [ + { + "name": "OUTPUT", + "type": "stream", + "keyFormat": {"format": "KAFKA"}, + "schema": "ID INT KEY, NAME STRING" + } + ] + } + }, + { + "name": "only key column - select star - with join on keys", + "statements": [ + "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", + "CREATE STREAM R (X INT KEY, Y INT, Z INT) WITH (kafka_topic='RIGHT', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.X PARTITION BY L.B;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 11}, + {"topic": "RIGHT", "key": 0, "value": {"Y": -1, "Z": -2}, "timestamp": 12} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"L_ROWTIME": 11, "R_ROWTIME": 12, "L_A": 0, "R_X": 0, "R_Y": -1, "L_C": 2, "R_Z": -2}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "L_B INT KEY, L_C INT, L_ROWTIME BIGINT, L_A INT, R_Y INT, R_Z INT, R_ROWTIME BIGINT, R_X INT"} + ] + } + }, + { + "name": "only key column - select star - with join on value columns", + "statements": [ + "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", + "CREATE STREAM R (X INT KEY, Y INT, Z INT) WITH (kafka_topic='RIGHT', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.Y PARTITION BY A;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [ + {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 11}, + {"topic": "RIGHT", "key": -1, "value": {"Y": 1, "Z": -2}, "timestamp": 12} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 11, "R_ROWTIME": 12, "R_X": -1, "L_B": 1, "R_Y": 1, "L_C": 2, "R_Z": -2}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "L_A INT KEY, L_B INT, L_C INT, L_ROWTIME BIGINT, R_Y INT, R_Z INT, R_ROWTIME BIGINT, R_X INT"} + ] + } + }, { "name": "partition by with projection select some", "statements": [ diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java index c7b17221a973..1036b5f3a9b9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java @@ -44,7 +44,7 @@ public void shouldBuildSameSchemaForKsqlPlan() throws IOException { "Detected a change to the schema of the KSQL physical plan. This is dangerous. " + "It means that KSQL may no longer be able to read and execute older plans. " + "If you're sure that your change is backwards-compatible, then please regenerate " - + "the schema using KsqlPlanSchemaGenerator and update schema.json.", + + "the schema.json by running KsqlPlanSchemaTest.runMeToRegenerateSchemaFile().", jsonSchema, is(expected) ); diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json index 3fde5a933798..2d018d4b4e27 100644 --- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -113,7 +113,9 @@ }, "properties" : { "type" : "object", - "additionalProperties": {"type": "string"} + "additionalProperties" : { + "type" : "string" + } } }, "required" : [ "format" ] @@ -405,7 +407,7 @@ "title" : "streamSelectV1", "required" : [ "@type", "properties", "source", "selectExpressions" ] }, - "StreamSelectKey" : { + "StreamSelectKeyV1" : { "type" : "object", "additionalProperties" : false, "properties" : { @@ -427,6 +429,28 @@ "title" : "streamSelectKeyV1", "required" : [ "@type", "properties", "source", "keyExpression" ] }, + "StreamSelectKey" : { + "type" : "object", + "additionalProperties" : false, + "properties" : { + "@type" : { + "type" : "string", + "enum" : [ "streamSelectKeyV2" ], + "default" : "streamSelectKeyV2" + }, + "properties" : { + "$ref" : "#/definitions/ExecutionStepPropertiesV1" + }, + "source" : { + "$ref" : "#/definitions/ExecutionStep" + }, + "keyExpression" : { + "type" : "string" + } + }, + "title" : "streamSelectKeyV2", + "required" : [ "@type", "properties", "source", "keyExpression" ] + }, "StreamSink" : { "type" : "object", "additionalProperties" : false, @@ -853,6 +877,8 @@ "$ref" : "#/definitions/StreamGroupByKey" }, { "$ref" : "#/definitions/StreamSelect" + }, { + "$ref" : "#/definitions/StreamSelectKeyV1" }, { "$ref" : "#/definitions/StreamSelectKey" }, { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 1309e6284527..357e5ab3f36f 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -34,6 +34,7 @@ import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamStreamJoin; @@ -233,6 +234,15 @@ public static StreamStreamJoin streamStreamJoin( ); } + public static StreamSelectKeyV1 streamSelectKeyV1( + final QueryContext.Stacker stacker, + final ExecutionStep> source, + final Expression fieldName + ) { + final QueryContext queryContext = stacker.getQueryContext(); + return new StreamSelectKeyV1(new ExecutionStepPropertiesV1(queryContext), source, fieldName); + } + public static StreamSelectKey streamSelectKey( final QueryContext.Stacker stacker, final ExecutionStep> source, diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index be7219027455..b801925e3c0e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamStreamJoin; @@ -135,7 +136,16 @@ public KStreamHolder visitFlatMap(final StreamFlatMap streamFlatMap) { @Override public KStreamHolder visitStreamSelectKey( - final StreamSelectKey streamSelectKey) { + final StreamSelectKeyV1 streamSelectKey + ) { + final KStreamHolder source = streamSelectKey.getSource().build(this); + return StreamSelectKeyBuilderV1.build(source, streamSelectKey, queryBuilder); + } + + @Override + public KStreamHolder visitStreamSelectKey( + final StreamSelectKey streamSelectKey + ) { final KStreamHolder source = streamSelectKey.getSource().build(this); return StreamSelectKeyBuilder.build(source, streamSelectKey, queryBuilder); } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java new file mode 100644 index 000000000000..138b2f2cc036 --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static java.util.Objects.requireNonNull; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.KeyValue; + +public final class PartitionByParams { + + private final LogicalSchema schema; + private BiPredicate predicate; + private BiFunction> mapper; + + public PartitionByParams( + final LogicalSchema schema, + final BiPredicate predicate, + final BiFunction> mapper + ) { + this.schema = requireNonNull(schema, "schema"); + this.predicate = requireNonNull(predicate, "predicate"); + this.mapper = requireNonNull(mapper, "mapper"); + } + + public LogicalSchema getSchema() { + return schema; + } + + public BiPredicate getPredicate() { + return predicate; + } + + public BiFunction> getMapper() { + return mapper; + } +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java new file mode 100644 index 000000000000..98ecfc27bd63 --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java @@ -0,0 +1,190 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.codegen.CodeGenRunner; +import io.confluent.ksql.execution.codegen.ExpressionMetadata; +import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.util.EngineProcessingLogMessageFactory; +import io.confluent.ksql.execution.util.ExpressionTypeManager; +import io.confluent.ksql.execution.util.StructKeyUtil; +import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.ColumnNames; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import java.util.function.Function; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.KeyValue; + +public final class PartitionByParamsFactory { + + private PartitionByParamsFactory() { + } + + public static PartitionByParams build( + final LogicalSchema sourceSchema, + final Expression partitionBy, + final KsqlConfig ksqlConfig, + final FunctionRegistry functionRegistry, + final ProcessingLogger logger + ) { + final Optional partitionByCol = getPartitionByCol(sourceSchema, partitionBy); + + final Function evaluator = buildExpressionEvaluator( + sourceSchema, + partitionBy, + ksqlConfig, + functionRegistry, + logger + ); + + final LogicalSchema resultSchema = + buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol); + + return buildMapper(resultSchema, partitionByCol, evaluator); + } + + public static LogicalSchema buildSchema( + final LogicalSchema sourceSchema, + final Expression partitionBy, + final FunctionRegistry functionRegistry + ) { + final Optional partitionByCol = getPartitionByCol(sourceSchema, partitionBy); + return buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol); + } + + private static LogicalSchema buildSchema( + final LogicalSchema sourceSchema, + final Expression partitionBy, + final FunctionRegistry functionRegistry, + final Optional partitionByCol + ) { + final ExpressionTypeManager expressionTypeManager = + new ExpressionTypeManager(sourceSchema, functionRegistry); + + final SqlType keyType = expressionTypeManager + .getExpressionSqlType(partitionBy); + + final ColumnName newKeyName = partitionByCol + .map(Column::name) + .orElseGet(() -> ColumnNames.nextGeneratedColumnAlias(sourceSchema)); + + final Builder builder = LogicalSchema.builder() + .withRowTime() + .keyColumn(newKeyName, keyType) + .valueColumns(sourceSchema.value()); + + if (!partitionByCol.isPresent()) { + // New key column added, copy in to value schema: + builder.valueColumn(newKeyName, keyType); + } + + return builder.build(); + } + + private static Optional getPartitionByCol( + final LogicalSchema sourceSchema, + final Expression partitionBy + ) { + if (!(partitionBy instanceof ColumnReferenceExp)) { + return Optional.empty(); + } + + final ColumnName columnName = ((ColumnReferenceExp) partitionBy).getColumnName(); + + final Column column = sourceSchema + .findValueColumn(columnName) + .orElseThrow(() -> new IllegalStateException("Unknown partition by column: " + columnName)); + + return Optional.of(column); + } + + private static PartitionByParams buildMapper( + final LogicalSchema resultSchema, + final Optional partitionByCol, + final Function evaluator + ) { + // If partitioning by something other than an existing column, then a new key will have + // been synthesized. This new key must be appended to the value to make it available for + // stream processing, in the same way SourceBuilder appends the key and rowtime to the value: + final boolean appendNewKey = !partitionByCol.isPresent(); + + final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema); + + final BiPredicate predicate = (k, v) -> { + if (v == null) { + return false; + } + + return evaluator.apply(v) != null; + }; + + final BiFunction> mapper = (k, v) -> { + final Object newKey = evaluator.apply(v); + final Struct structKey = keyBuilder.build(newKey); + + if (appendNewKey) { + v.append(newKey); + } + + return new KeyValue<>(structKey, v); + }; + + return new PartitionByParams(resultSchema, predicate, mapper); + } + + private static Function buildExpressionEvaluator( + final LogicalSchema schema, + final Expression partitionBy, + final KsqlConfig ksqlConfig, + final FunctionRegistry functionRegistry, + final ProcessingLogger logger + ) { + final CodeGenRunner codeGen = new CodeGenRunner( + schema, + ksqlConfig, + functionRegistry + ); + + final ExpressionMetadata expressionMetadata = codeGen + .buildCodeGenFromParseTree(partitionBy, "SelectKey"); + + return row -> { + try { + return expressionMetadata.evaluate(row); + } catch (final Exception e) { + final String errorMsg = "Error computing new key from expression " + + expressionMetadata.getExpression() + + " : " + + e.getMessage(); + + logger.error(EngineProcessingLogMessageFactory.recordProcessingError(errorMsg, e, row)); + return null; + } + }; + } +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index 3671ea4d6f6f..dcadea728dc9 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamStreamJoin; @@ -73,6 +74,7 @@ public final class StepSchemaResolver { .put(StreamGroupBy.class, StepSchemaResolver::handleStreamGroupBy) .put(StreamGroupByKey.class, StepSchemaResolver::sameSchema) .put(StreamSelect.class, StepSchemaResolver::handleStreamSelect) + .put(StreamSelectKeyV1.class, StepSchemaResolver::handleSelectKeyV1) .put(StreamSelectKey.class, StepSchemaResolver::handleSelectKey) .put(StreamSink.class, StepSchemaResolver::sameSchema) .put(StreamSource.class, StepSchemaResolver::handleSource) @@ -207,9 +209,9 @@ private LogicalSchema handleStreamSelect( return buildSelectSchema(schema, step.getSelectExpressions()); } - private LogicalSchema handleSelectKey( + private LogicalSchema handleSelectKeyV1( final LogicalSchema sourceSchema, - final StreamSelectKey step + final StreamSelectKeyV1 step ) { final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(sourceSchema, functionRegistry); @@ -224,6 +226,17 @@ private LogicalSchema handleSelectKey( .build(); } + private LogicalSchema handleSelectKey( + final LogicalSchema sourceSchema, + final StreamSelectKey step + ) { + return PartitionByParamsFactory.buildSchema( + sourceSchema, + step.getKeyExpression(), + functionRegistry + ); + } + private LogicalSchema handleSource(final LogicalSchema schema, final SourceStep step) { return buildSourceSchema(schema, false); } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java index 6594ea9ecc39..c98eb61259c2 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java @@ -17,65 +17,58 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; -import io.confluent.ksql.execution.codegen.CodeGenRunner; -import io.confluent.ksql.execution.codegen.ExpressionMetadata; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.plan.KStreamHolder; import io.confluent.ksql.execution.plan.KeySerdeFactory; import io.confluent.ksql.execution.plan.StreamSelectKey; -import io.confluent.ksql.execution.util.StructKeyUtil; -import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; public final class StreamSelectKeyBuilder { - private static final String EXP_TYPE = "SelectKey"; - private StreamSelectKeyBuilder() { } + @SuppressWarnings("unchecked") public static KStreamHolder build( final KStreamHolder stream, final StreamSelectKey selectKey, final KsqlQueryBuilder queryBuilder ) { final LogicalSchema sourceSchema = stream.getSchema(); + final QueryContext queryContext = selectKey.getProperties().getQueryContext(); + + final ProcessingLogger logger = queryBuilder.getProcessingLogger(queryContext); - final ExpressionMetadata expression = buildExpressionEvaluator( - selectKey, - queryBuilder, - sourceSchema + final PartitionByParams params = PartitionByParamsFactory.build( + sourceSchema, + selectKey.getKeyExpression(), + queryBuilder.getKsqlConfig(), + queryBuilder.getFunctionRegistry(), + logger ); - final LogicalSchema resultSchema = new StepSchemaResolver(queryBuilder.getKsqlConfig(), - queryBuilder.getFunctionRegistry()).resolve(selectKey, sourceSchema); + final BiPredicate predicate = params.getPredicate(); + final BiFunction> mapper = params.getMapper(); - final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema); + // This cast is safe because selectKey is not allowed on windowed streams: + final KStream kStream = (KStream) stream.getStream(); - final KStream kstream = stream.getStream(); - final KStream rekeyed = kstream - .filter((key, val) -> val != null && expression.evaluate(val) != null) - .selectKey((key, val) -> keyBuilder.build(expression.evaluate(val))); + final KStream reKeyed = kStream + .filter(predicate::test, Named + .as(queryContext.formatContext() + "-FilterNulls")) + .map(mapper::apply, Named.as(queryContext.formatContext() + "-SelectKey")); return new KStreamHolder<>( - rekeyed, - resultSchema, + reKeyed, + params.getSchema(), KeySerdeFactory.unwindowed(queryBuilder) ); } - - private static ExpressionMetadata buildExpressionEvaluator( - final StreamSelectKey selectKey, - final KsqlQueryBuilder queryBuilder, - final LogicalSchema sourceSchema - ) { - final CodeGenRunner codeGen = new CodeGenRunner( - sourceSchema, - queryBuilder.getKsqlConfig(), - queryBuilder.getFunctionRegistry() - ); - - return codeGen.buildCodeGenFromParseTree(selectKey.getKeyExpression(), EXP_TYPE); - } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1.java new file mode 100644 index 000000000000..7980131213db --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.codegen.CodeGenRunner; +import io.confluent.ksql.execution.codegen.ExpressionMetadata; +import io.confluent.ksql.execution.plan.KStreamHolder; +import io.confluent.ksql.execution.plan.KeySerdeFactory; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; +import io.confluent.ksql.execution.util.StructKeyUtil; +import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; + +public final class StreamSelectKeyBuilderV1 { + + private static final String EXP_TYPE = "SelectKey"; + + private StreamSelectKeyBuilderV1() { + } + + public static KStreamHolder build( + final KStreamHolder stream, + final StreamSelectKeyV1 selectKey, + final KsqlQueryBuilder queryBuilder + ) { + final LogicalSchema sourceSchema = stream.getSchema(); + + final ExpressionMetadata expression = buildExpressionEvaluator( + selectKey, + queryBuilder, + sourceSchema + ); + + final LogicalSchema resultSchema = new StepSchemaResolver(queryBuilder.getKsqlConfig(), + queryBuilder.getFunctionRegistry()).resolve(selectKey, sourceSchema); + + final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema); + + final KStream kstream = stream.getStream(); + final KStream rekeyed = kstream + .filter((key, val) -> val != null && expression.evaluate(val) != null) + .selectKey((key, val) -> keyBuilder.build(expression.evaluate(val))); + + return new KStreamHolder<>( + rekeyed, + resultSchema, + KeySerdeFactory.unwindowed(queryBuilder) + ); + } + + private static ExpressionMetadata buildExpressionEvaluator( + final StreamSelectKeyV1 selectKey, + final KsqlQueryBuilder queryBuilder, + final LogicalSchema sourceSchema + ) { + final CodeGenRunner codeGen = new CodeGenRunner( + sourceSchema, + queryBuilder.getKsqlConfig(), + queryBuilder.getFunctionRegistry() + ); + + return codeGen.buildCodeGenFromParseTree(selectKey.getKeyExpression(), EXP_TYPE); + } +} diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java new file mode 100644 index 000000000000..ea6375bd6e95 --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java @@ -0,0 +1,269 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; +import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression.Sign; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; +import io.confluent.ksql.execution.util.StructKeyUtil; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.function.KsqlScalarFunction; +import io.confluent.ksql.function.UdfFactory; +import io.confluent.ksql.function.types.ParamTypes; +import io.confluent.ksql.function.udf.Kudf; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.SchemaUtil; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.KeyValue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PartitionByParamsFactoryTest { + + private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(ImmutableMap.of()); + + private static final ColumnName COL0 = ColumnName.of("COL0"); + private static final ColumnName COL1 = ColumnName.of("COL1"); + private static final ColumnName COL2 = ColumnName.of("KSQL_COL_3"); + + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .keyColumn(COL0, SqlTypes.STRING) + .valueColumn(COL1, SqlTypes.INTEGER) + .valueColumn(COL2, SqlTypes.INTEGER) + .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) + .valueColumn(COL0, SqlTypes.STRING) + .build(); + + private static final FunctionName FAILING_UDF_NAME = FunctionName.of("I_THROW"); + private static final KsqlScalarFunction FAILING_UDF_FUNC = KsqlScalarFunction.createLegacyBuiltIn( + SqlTypes.INTEGER, + ImmutableList.of(ParamTypes.STRING), + FAILING_UDF_NAME, + FailingUdf.class + ); + + private static final FunctionName CONSTANT_UDF_NAME = FunctionName.of("I_RETURN_42"); + private static final KsqlScalarFunction CONSTANT_UDF_FUNC = KsqlScalarFunction + .createLegacyBuiltIn( + SqlTypes.BIGINT, + ImmutableList.of(ParamTypes.INTEGER), + CONSTANT_UDF_NAME, + ConstantUdf.class + ); + + private static final FunctionCall FAILING_UDF = + new FunctionCall(FAILING_UDF_NAME, ImmutableList.of()); + + private static final String OLD_KEY = "oldKey"; + private static final int COL1_VALUE = 123; + + @Mock + private ProcessingLogger logger; + @Mock + private FunctionRegistry functionRegistry; + @Mock + private UdfFactory failingUdfFactory; + @Mock + private UdfFactory constantUdfFactory; + + private final Struct key = new Struct(SCHEMA.keyConnectSchema()); + private final GenericRow value = new GenericRow(); + + @Before + public void setUp() { + when(functionRegistry.getUdfFactory(FAILING_UDF_NAME)).thenReturn(failingUdfFactory); + when(failingUdfFactory.getFunction(any())).thenReturn(FAILING_UDF_FUNC); + + when(functionRegistry.getUdfFactory(CONSTANT_UDF_NAME)).thenReturn(constantUdfFactory); + when(constantUdfFactory.getFunction(any())).thenReturn(CONSTANT_UDF_FUNC); + + key.put(COL0.text(), OLD_KEY); + value + .append(COL1_VALUE) // COL1 + .append(10L) // COL2 + .append(1000L) // Copy of ROWTIME in value + .append(OLD_KEY); // Copy of key in value + } + + @Test + public void shouldBuildResultSchemaWhenPartitioningByColumnRef() { + // Given: + final Expression partitionBy = new UnqualifiedColumnReferenceExp(COL1); + + // When: + final LogicalSchema resultSchema = PartitionByParamsFactory.buildSchema( + SCHEMA, + partitionBy, + functionRegistry + ); + + // Then: + assertThat(resultSchema, is(LogicalSchema.builder() + .withRowTime() + .keyColumn(COL1, SqlTypes.INTEGER) + .valueColumn(COL1, SqlTypes.INTEGER) + .valueColumn(COL2, SqlTypes.INTEGER) + .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) + .valueColumn(COL0, SqlTypes.STRING) + .build())); + } + + @Test + public void shouldBuildResultSchemaWhenPartitioningByOtherExpressionType() { + // Given: + final Expression partitionBy = new ArithmeticUnaryExpression( + Optional.empty(), + Sign.MINUS, + new UnqualifiedColumnReferenceExp(COL1) + ); + + // When: + final LogicalSchema resultSchema = PartitionByParamsFactory.buildSchema( + SCHEMA, + partitionBy, + functionRegistry + ); + + // Then: + assertThat(resultSchema, is(LogicalSchema.builder() + .withRowTime() + .keyColumn(ColumnName.of("KSQL_COL_4"), SqlTypes.INTEGER) + .valueColumn(COL1, SqlTypes.INTEGER) + .valueColumn(COL2, SqlTypes.INTEGER) + .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) + .valueColumn(COL0, SqlTypes.STRING) + .valueColumn(ColumnName.of("KSQL_COL_4"), SqlTypes.INTEGER) + .build())); + } + + @Test + public void shouldLogOnErrorExtractingNewKey() { + // Given: + final BiPredicate predicate = partitionBy(FAILING_UDF) + .getPredicate(); + + // When: + predicate.test(key, value); + + // Then: + verify(logger).error(any()); + } + + @Test + public void shouldFilterOutRowsWhereFailedToExtractKey() { + // Given: + final BiPredicate predicate = partitionBy(FAILING_UDF) + .getPredicate(); + + // When: + final boolean result = predicate.test(key, value); + + // Then: + assertThat(result, is(false)); + } + + @Test + public void shouldSetNewKey() { + // Given: + final BiFunction> mapper = + partitionBy(new UnqualifiedColumnReferenceExp(COL1)).getMapper(); + + // When: + final KeyValue result = mapper.apply(key, value); + + // Then: + assertThat(result.key, is(StructKeyUtil.keyBuilder(COL1, SqlTypes.INTEGER).build(COL1_VALUE))); + } + + @Test + public void shouldNotChangeValueIfPartitioningByColumnReference() { + // Given: + final BiFunction> mapper = + partitionBy(new UnqualifiedColumnReferenceExp(COL1)).getMapper(); + + final ImmutableList originals = ImmutableList.copyOf(value.values()); + + // When: + final KeyValue result = mapper.apply(key, value); + + // Then: + assertThat(result.value, is(GenericRow.fromList(originals))); + } + + @Test + public void shouldAppendNewKeyColumnToValueIfNotPartitioningByColumnReference() { + // Given: + final BiFunction> mapper = + partitionBy(new FunctionCall( + CONSTANT_UDF_NAME, + ImmutableList.of(new UnqualifiedColumnReferenceExp(COL1))) + ).getMapper(); + + final ImmutableList originals = ImmutableList.copyOf(value.values()); + + // When: + final KeyValue result = mapper.apply(key, value); + + // Then: + assertThat(result.value, is(GenericRow.fromList(originals).append(ConstantUdf.VALUE))); + } + + private PartitionByParams partitionBy(final Expression expression) { + return PartitionByParamsFactory + .build(SCHEMA, expression, KSQL_CONFIG, functionRegistry, logger); + } + + public static class FailingUdf implements Kudf { + + @Override + public Object evaluate(final Object... args) { + throw new IllegalStateException(); + } + } + + public static class ConstantUdf implements Kudf { + + private static final long VALUE = 42L; + + @Override + public Object evaluate(final Object... args) { + return VALUE; + } + } +} \ No newline at end of file diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java index 1253f0a0c565..8b4d9bcf9381 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamWindowedAggregate; import io.confluent.ksql.execution.plan.TableAggregate; @@ -281,12 +282,12 @@ public void shouldResolveSchemaForStreamGroupByKey() { } @Test - public void shouldResolveSchemaForStreamSelectKey() { + public void shouldResolveSchemaForStreamSelectKeyV1() { // Given: final Expression keyExpression = new UnqualifiedColumnReferenceExp(ColumnName.of("ORANGE")); - final StreamSelectKey step = new StreamSelectKey( + final StreamSelectKeyV1 step = new StreamSelectKeyV1( PROPERTIES, streamSource, keyExpression @@ -304,6 +305,30 @@ public void shouldResolveSchemaForStreamSelectKey() { )); } + @Test + public void shouldResolveSchemaForStreamSelectKeyV2() { + // Given: + final UnqualifiedColumnReferenceExp keyExpression = + new UnqualifiedColumnReferenceExp(ColumnName.of("ORANGE")); + + final StreamSelectKey step = new StreamSelectKey( + PROPERTIES, + streamSource, + keyExpression + ); + + // When: + final LogicalSchema result = resolver.resolve(step, SCHEMA); + + // Then: + assertThat(result, is(LogicalSchema.builder() + .withRowTime() + .keyColumn(keyExpression.getColumnName(), SqlTypes.INTEGER) + .valueColumns(SCHEMA.value()) + .build() + )); + } + @Test public void shouldResolveSchemaForStreamSource() { final StreamSource step = new StreamSource( diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java index 5b1c24e6c8a1..d38a2bf7ba22 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java @@ -15,8 +15,9 @@ package io.confluent.ksql.execution.streams; +import static io.confluent.ksql.GenericRow.genericRow; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -36,6 +37,7 @@ import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -46,8 +48,10 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.SchemaUtil; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.junit.Before; import org.junit.Test; @@ -75,7 +79,7 @@ public class StreamSelectKeyBuilderTest { private static final LogicalSchema RESULT_SCHEMA = LogicalSchema.builder() .withRowTime() - .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT) + .keyColumn(ColumnName.of("BOI"), SqlTypes.BIGINT) .valueColumn(ColumnName.of("BIG"), SqlTypes.BIGINT) .valueColumn(ColumnName.of("BOI"), SqlTypes.BIGINT) .valueColumn(ColumnName.of(SchemaUtil.ROWTIME_NAME.text()), SqlTypes.BIGINT) @@ -103,10 +107,12 @@ public class StreamSelectKeyBuilderTest { private KsqlQueryBuilder queryBuilder; @Mock private FunctionRegistry functionRegistry; + @Mock + private ProcessingLogger processingLogger; @Captor private ArgumentCaptor> predicateCaptor; @Captor - private ArgumentCaptor> keyValueMapperCaptor; + private ArgumentCaptor>> keyValueMapperCaptor; private final QueryContext queryContext = new QueryContext.Stacker().push("ya").getQueryContext(); @@ -117,10 +123,12 @@ public class StreamSelectKeyBuilderTest { @Before @SuppressWarnings("unchecked") public void init() { + when(queryBuilder.getProcessingLogger(any())).thenReturn(processingLogger); when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); when(queryBuilder.getKsqlConfig()).thenReturn(new KsqlConfig(ImmutableMap.of())); - when(kstream.filter(any())).thenReturn(filteredKStream); - when(filteredKStream.selectKey(any(KeyValueMapper.class))).thenReturn(rekeyedKstream); + when(kstream.filter(any(), any(Named.class))).thenReturn(filteredKStream); + when(filteredKStream.map(any(KeyValueMapper.class), any(Named.class))) + .thenReturn(rekeyedKstream); when(sourceStep.build(any())).thenReturn( new KStreamHolder<>(kstream, SOURCE_SCHEMA, mock(KeySerdeFactory.class))); planBuilder = new KSPlanBuilder( @@ -143,8 +151,8 @@ public void shouldRekeyCorrectly() { // Then: final InOrder inOrder = Mockito.inOrder(kstream, filteredKStream, rekeyedKstream); - inOrder.verify(kstream).filter(any()); - inOrder.verify(filteredKStream).selectKey(any()); + inOrder.verify(kstream).filter(any(), any(Named.class)); + inOrder.verify(filteredKStream).map(any(), any(Named.class)); inOrder.verifyNoMoreInteractions(); assertThat(result.getStream(), is(rekeyedKstream)); } @@ -160,6 +168,7 @@ public void shouldReturnCorrectSerdeFactory() { PhysicalSchema.from(SOURCE_SCHEMA, SerdeOption.none()), queryContext ); + verify(queryBuilder).buildKeySerde( FormatInfo.of(FormatFactory.JSON.name()), PhysicalSchema.from(SOURCE_SCHEMA, SerdeOption.none()), @@ -168,65 +177,74 @@ public void shouldReturnCorrectSerdeFactory() { @Test public void shouldFilterOutNullValues() { - // When: + // Given: selectKey.build(planBuilder); - // Then: - verify(kstream).filter(predicateCaptor.capture()); final Predicate predicate = getPredicate(); - assertThat(predicate.test(SOURCE_KEY, null), is(false)); + + // When: + final boolean result = predicate.test(SOURCE_KEY, null); + + // Then: + assertThat(result, is(false)); } @Test public void shouldFilterOutNullKeyColumns() { - // When: + // Given: selectKey.build(planBuilder); - // Then: - verify(kstream).filter(predicateCaptor.capture()); final Predicate predicate = getPredicate(); - assertThat( - predicate.test(SOURCE_KEY, value(A_BIG, null, 0, "dre")), - is(false) - ); + + // When: + final boolean result = predicate.test(SOURCE_KEY, genericRow(A_BIG, null, 0, "dre")); + + // Then: + assertThat(result, is(false)); } @Test public void shouldNotFilterOutNonNullKeyColumns() { - // When: + // Given: selectKey.build(planBuilder); - // Then: - verify(kstream).filter(predicateCaptor.capture()); final Predicate predicate = getPredicate(); - assertThat( - predicate.test(SOURCE_KEY, value(A_BIG, A_BOI, 0, "dre")), - is(true) - ); + + // When: + final boolean result = predicate.test(SOURCE_KEY, genericRow(A_BIG, A_BOI, 0, "dre")); + + // Then: + assertThat(result, is(true)); } @Test public void shouldIgnoreNullNonKeyColumns() { - // When: + // Given: selectKey.build(planBuilder); - // Then: - verify(kstream).filter(predicateCaptor.capture()); final Predicate predicate = getPredicate(); - assertThat(predicate.test(SOURCE_KEY, value(null, A_BOI, 0, "dre")), is(true)); + + // When: + final boolean result = predicate.test(SOURCE_KEY, genericRow(null, A_BOI, 0, "dre")); + + // Then: + assertThat(result, is(true)); } @Test public void shouldComputeCorrectKey() { - // When: + // Given: selectKey.build(planBuilder); + final KeyValueMapper> keyValueMapper = + getKeyMapper(); + + // When: + final KeyValue result = keyValueMapper + .apply(SOURCE_KEY, genericRow(A_BIG, A_BOI, 0, "dre")); + // Then: - final KeyValueMapper keyValueMapper = getKeyMapper(); - assertThat( - keyValueMapper.apply(SOURCE_KEY, value(A_BIG, A_BOI, 0, "dre")), - is(RESULT_KEY_BUILDER.build(A_BOI)) - ); + assertThat(result.key, is(RESULT_KEY_BUILDER.build(A_BOI))); } @Test @@ -238,22 +256,13 @@ public void shouldReturnCorrectSchema() { assertThat(result.getSchema(), is(RESULT_SCHEMA)); } - private KeyValueMapper getKeyMapper() { - verify(filteredKStream).selectKey(keyValueMapperCaptor.capture()); + private KeyValueMapper> getKeyMapper() { + verify(filteredKStream).map(keyValueMapperCaptor.capture(), any(Named.class)); return keyValueMapperCaptor.getValue(); } private Predicate getPredicate() { - verify(kstream).filter(predicateCaptor.capture()); + verify(kstream).filter(predicateCaptor.capture(), any(Named.class)); return predicateCaptor.getValue(); } - - private static GenericRow value( - final Long big, - final Long boi, - final int rowTime, - final String rowKey - ) { - return GenericRow.genericRow(big, boi, rowTime, rowKey); - } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java new file mode 100644 index 000000000000..eccb8954253f --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java @@ -0,0 +1,259 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; +import io.confluent.ksql.execution.plan.KStreamHolder; +import io.confluent.ksql.execution.plan.KeySerdeFactory; +import io.confluent.ksql.execution.plan.PlanBuilder; +import io.confluent.ksql.execution.plan.StreamSelectKeyV1; +import io.confluent.ksql.execution.util.StructKeyUtil; +import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.SchemaUtil; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StreamSelectKeyBuilderV1Test { + + private static final LogicalSchema SOURCE_SCHEMA = LogicalSchema.builder() + .withRowTime() + .keyColumn(ColumnName.of("k0"), SqlTypes.DOUBLE) + .valueColumn(ColumnName.of("BIG"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("BOI"), SqlTypes.BIGINT) + .build() + .withMetaAndKeyColsInValue(false); + + private static final UnqualifiedColumnReferenceExp KEY = + new UnqualifiedColumnReferenceExp(ColumnName.of("BOI")); + + private static final LogicalSchema RESULT_SCHEMA = LogicalSchema.builder() + .withRowTime() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT) + .valueColumn(ColumnName.of("BIG"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("BOI"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of(SchemaUtil.ROWTIME_NAME.text()), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("k0"), SqlTypes.DOUBLE) + .build(); + + private static final KeyBuilder RESULT_KEY_BUILDER = StructKeyUtil.keyBuilder(RESULT_SCHEMA); + + private static final long A_BOI = 5000; + private static final long A_BIG = 3000; + + private static final Struct SOURCE_KEY = StructKeyUtil + .keyBuilder(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) + .build("dre"); + + @Mock + private KStream kstream; + @Mock + private KStream rekeyedKstream; + @Mock + private KStream filteredKStream; + @Mock + private ExecutionStep> sourceStep; + @Mock + private KsqlQueryBuilder queryBuilder; + @Mock + private FunctionRegistry functionRegistry; + @Captor + private ArgumentCaptor> predicateCaptor; + @Captor + private ArgumentCaptor> keyValueMapperCaptor; + + private final QueryContext queryContext = + new QueryContext.Stacker().push("ya").getQueryContext(); + + private PlanBuilder planBuilder; + private StreamSelectKeyV1 selectKey; + + @Before + @SuppressWarnings("unchecked") + public void init() { + when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); + when(queryBuilder.getKsqlConfig()).thenReturn(new KsqlConfig(ImmutableMap.of())); + when(kstream.filter(any())).thenReturn(filteredKStream); + when(filteredKStream.selectKey(any(KeyValueMapper.class))).thenReturn(rekeyedKstream); + when(sourceStep.build(any())).thenReturn( + new KStreamHolder<>(kstream, SOURCE_SCHEMA, mock(KeySerdeFactory.class))); + planBuilder = new KSPlanBuilder( + queryBuilder, + mock(SqlPredicateFactory.class), + mock(AggregateParamsFactory.class), + mock(StreamsFactories.class) + ); + selectKey = new StreamSelectKeyV1( + new ExecutionStepPropertiesV1(queryContext), + sourceStep, + KEY + ); + } + + @Test + public void shouldRekeyCorrectly() { + // When: + final KStreamHolder result = selectKey.build(planBuilder); + + // Then: + final InOrder inOrder = Mockito.inOrder(kstream, filteredKStream, rekeyedKstream); + inOrder.verify(kstream).filter(any()); + inOrder.verify(filteredKStream).selectKey(any()); + inOrder.verifyNoMoreInteractions(); + assertThat(result.getStream(), is(rekeyedKstream)); + } + + @Test + public void shouldReturnCorrectSerdeFactory() { + // When: + final KStreamHolder result = selectKey.build(planBuilder); + + // Then: + result.getKeySerdeFactory().buildKeySerde( + FormatInfo.of(FormatFactory.JSON.name()), + PhysicalSchema.from(SOURCE_SCHEMA, SerdeOption.none()), + queryContext + ); + verify(queryBuilder).buildKeySerde( + FormatInfo.of(FormatFactory.JSON.name()), + PhysicalSchema.from(SOURCE_SCHEMA, SerdeOption.none()), + queryContext); + } + + @Test + public void shouldFilterOutNullValues() { + // When: + selectKey.build(planBuilder); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat(predicate.test(SOURCE_KEY, null), is(false)); + } + + @Test + public void shouldFilterOutNullKeyColumns() { + // When: + selectKey.build(planBuilder); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat( + predicate.test(SOURCE_KEY, value(A_BIG, null, 0, "dre")), + is(false) + ); + } + + @Test + public void shouldNotFilterOutNonNullKeyColumns() { + // When: + selectKey.build(planBuilder); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat( + predicate.test(SOURCE_KEY, value(A_BIG, A_BOI, 0, "dre")), + is(true) + ); + } + + @Test + public void shouldIgnoreNullNonKeyColumns() { + // When: + selectKey.build(planBuilder); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat(predicate.test(SOURCE_KEY, value(null, A_BOI, 0, "dre")), is(true)); + } + + @Test + public void shouldComputeCorrectKey() { + // When: + selectKey.build(planBuilder); + + // Then: + final KeyValueMapper keyValueMapper = getKeyMapper(); + assertThat( + keyValueMapper.apply(SOURCE_KEY, value(A_BIG, A_BOI, 0, "dre")), + is(RESULT_KEY_BUILDER.build(A_BOI)) + ); + } + + @Test + public void shouldReturnCorrectSchema() { + // When: + final KStreamHolder result = selectKey.build(planBuilder); + + // Then: + assertThat(result.getSchema(), is(RESULT_SCHEMA)); + } + + private KeyValueMapper getKeyMapper() { + verify(filteredKStream).selectKey(keyValueMapperCaptor.capture()); + return keyValueMapperCaptor.getValue(); + } + + private Predicate getPredicate() { + verify(kstream).filter(predicateCaptor.capture()); + return predicateCaptor.getValue(); + } + + private static GenericRow value( + final Long big, + final Long boi, + final int rowTime, + final String rowKey + ) { + return GenericRow.genericRow(big, boi, rowTime, rowKey); + } +} From 8e35587a5d918307a002546289414b63fa829624 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Fri, 20 Mar 2020 11:09:49 +0000 Subject: [PATCH 2/2] chore: changes requested by Almog --- .../query-validation-tests/partition-by.json | 20 +++++++++++++++++++ .../streams/PartitionByParamsFactory.java | 20 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index be273c741e9d..b48ad07b4f2b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -40,6 +40,26 @@ ] } }, + { + "name": "expression - select star", + "statements": [ + "CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');", + "CREATE STREAM OUTPUT AS select * from INPUT partition by ABS(ID);" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob"}}], + "outputs": [{"topic": "OUTPUT", "key": 10, "value": {"NAME": "bob", "ID": 10}}], + "post": { + "topics": { + "blacklist": ".*-repartition" + }, + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "KSQL_COL_0 INT KEY, NAME STRING, ID INT"} + ] + } + }, { "name": "only key column - select columns", "statements": [ diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java index 98ecfc27bd63..5e318f05207f 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java @@ -40,6 +40,26 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; +/** + * Factory for PartitionByParams. + * + *

Behaviour differs slightly depending on whether PARTITIONing BY a column reference or some + * other expression: + * + *

When PARTITIONing BY a column reference the existing key column(s) are moved into the value + * schema and the new key column is moved to the key, e.g. logically {@code A => B, C}, when + * {@code PARTITION BY B}, becomes {@code B => C, A}. However, processing schemas contain a copy of + * the key columns in the value, so actually {@code A => B, C, A} becomes {@code B => B, C, A}: + * Note: the value columns does not need to change. + * + *

When PARTITIONing BY any other type of expression no column can be removed from the logical + * schema's value columns. The PARTITION BY expression is creating a new column. Hence, the + * existing key column(s) are moved to the value schema and a new key column is added, e.g. + * logically {@code A => B, C}, when {@code PARTITION BY exp}, becomes {@code KSQL_COL_0 => B, C, A} + * However, processing schemas contain a copy of the key columns in the value, so actually {@code + * A => B, C, A} becomes {@code KSQL_COL_0 => B, C, A, KSQL_COL_0}. Note: the value column only has + * the new key column added. + */ public final class PartitionByParamsFactory { private PartitionByParamsFactory() {