diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java b/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java
index 54f38efcdb96..dd88e59919c6 100644
--- a/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java
+++ b/ksqldb-common/src/main/java/io/confluent/ksql/name/ColumnNames.java
@@ -1,8 +1,8 @@
/*
* Copyright 2020 Confluent Inc.
*
- * Licensed under the Confluent Community License (the "License"; you may not use
- * this file except in compliance with the License. You may obtain a copy of the
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
@@ -15,12 +15,21 @@
package io.confluent.ksql.name;
+import io.confluent.ksql.schema.ksql.Column;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import java.util.OptionalInt;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public final class ColumnNames {
private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_";
private static final String GENERATED_ALIAS_PREFIX = "KSQL_COL_";
private static final String SYNTHESISED_COLUMN_PREFIX = "KSQL_SYNTH_";
+ private static final Pattern GENERATED_ALIAS_PATTERN = Pattern
+ .compile(GENERATED_ALIAS_PREFIX + "(\\d+)");
+
private ColumnNames() {
}
@@ -32,24 +41,24 @@ public static ColumnName aggregateColumn(final int idx) {
}
/**
- * Where the user hasn't specified an alias for an expression in a SELECT we generate them
- * using this method. This value is exposed to the user in the output schema
+ * Where the user hasn't specified an alias for an expression in a SELECT we generate them using
+ * this method. This value is exposed to the user in the output schema
*/
public static ColumnName generatedColumnAlias(final int idx) {
return ColumnName.of(GENERATED_ALIAS_PREFIX + idx);
}
/**
- * Used to generate a column name in an intermediate schema, e.g. for a column to hold
- * values of a table function. These are never exposed to the user
+ * Used to generate a column name in an intermediate schema, e.g. for a column to hold values of a
+ * table function. These are never exposed to the user
*/
public static ColumnName synthesisedSchemaColumn(final int idx) {
return ColumnName.of(SYNTHESISED_COLUMN_PREFIX + idx);
}
/**
- * Used to generate a column alias for a join where the a column with this name exists
- * in both of the sources.
+ * Used to generate a column alias for a join where the a column with this name exists in both of
+ * the sources.
*/
public static ColumnName generatedJoinColumnAlias(
final SourceName sourceName,
@@ -61,4 +70,42 @@ public static ColumnName generatedJoinColumnAlias(
public static boolean isAggregate(final ColumnName name) {
return name.text().startsWith(AGGREGATE_COLUMN_PREFIX);
}
+
+ /**
+ * Determines the next unique column alias.
+ *
+ *
Finds any existing {@code KSQL_COL_x} column names in the supplied {@code sourceSchema} to
+ * ensure the returned generated column name is unique.
+ *
+ * @param sourceSchema the source schema.
+ * @return a column name in the form {@code KSQL_COL_x} which does not clash with source schema.
+ */
+ public static ColumnName nextGeneratedColumnAlias(final LogicalSchema sourceSchema) {
+ final int maxExistingIdx = maxGeneratedAliasIndex(sourceSchema);
+ return generatedColumnAlias(maxExistingIdx + 1);
+ }
+
+ /**
+ * Determines the highest index of generated column names like {@code KSQL_COL_x} in the supplied
+ * {@code sourceSchema}.
+ *
+ * @param sourceSchema the schema.
+ * @return the highest index or {@code -1}
+ */
+ private static int maxGeneratedAliasIndex(final LogicalSchema sourceSchema) {
+ return sourceSchema.columns().stream()
+ .map(Column::name)
+ .map(ColumnNames::extractGeneratedAliasIndex)
+ .filter(OptionalInt::isPresent)
+ .mapToInt(OptionalInt::getAsInt)
+ .max()
+ .orElse(-1);
+ }
+
+ private static OptionalInt extractGeneratedAliasIndex(final ColumnName columnName) {
+ final Matcher matcher = GENERATED_ALIAS_PATTERN.matcher(columnName.text());
+ return matcher.matches()
+ ? OptionalInt.of(Integer.parseInt(matcher.group(1)))
+ : OptionalInt.empty();
+ }
}
diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java
new file mode 100644
index 000000000000..89aa4d757871
--- /dev/null
+++ b/ksqldb-common/src/test/java/io/confluent/ksql/name/ColumnNamesTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.name;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.types.SqlTypes;
+import org.junit.Test;
+
+public class ColumnNamesTest {
+
+ @Test
+ public void shouldGenerateUniqueAliasesStartingAtZero() {
+ // Given:
+ final LogicalSchema schema = LogicalSchema.builder()
+ .build();
+
+ // When:
+ final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
+
+ // Then:
+ assertThat(result, is(ColumnName.of("KSQL_COL_0")));
+ }
+
+ @Test
+ public void shouldGenerateUniqueAliasesTakingAnyKeyColumnsIntoAccount() {
+ // Given:
+ final LogicalSchema schema = LogicalSchema.builder()
+ .keyColumn(ColumnName.of("Fred"), SqlTypes.STRING)
+ .keyColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING)
+ .keyColumn(ColumnName.of("George"), SqlTypes.STRING)
+ .build();
+
+ // When:
+ final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
+
+ // Then:
+ assertThat(result, is(ColumnName.of("KSQL_COL_2")));
+ }
+
+ @Test
+ public void shouldGenerateUniqueAliasesTakingAnyValueColumnsIntoAccount() {
+ // Given:
+ final LogicalSchema schema = LogicalSchema.builder()
+ .valueColumn(ColumnName.of("Fred"), SqlTypes.STRING)
+ .valueColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING)
+ .valueColumn(ColumnName.of("George"), SqlTypes.STRING)
+ .build();
+
+ // When:
+ final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
+
+ // Then:
+ assertThat(result, is(ColumnName.of("KSQL_COL_2")));
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
index 47d6dd2eddc5..69201a613165 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
@@ -31,6 +31,7 @@
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.execution.plan.SelectExpression;
+import io.confluent.ksql.execution.streams.PartitionByParamsFactory;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
@@ -552,16 +553,25 @@ private LogicalSchema buildRepartitionedSchema(
return sourceSchema;
}
- final ExpressionTypeManager typeManager =
- new ExpressionTypeManager(sourceSchema, functionRegistry);
+ if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
+ final ExpressionTypeManager expressionTypeManager =
+ new ExpressionTypeManager(sourceSchema, functionRegistry);
- final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);
+ final SqlType keyType = expressionTypeManager
+ .getExpressionSqlType(partitionBy);
- return LogicalSchema.builder()
- .withRowTime()
- .keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
- .valueColumns(sourceSchema.value())
- .build();
+ return LogicalSchema.builder()
+ .withRowTime()
+ .keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
+ .valueColumns(sourceSchema.value())
+ .build();
+ }
+
+ return PartitionByParamsFactory.buildSchema(
+ sourceSchema,
+ partitionBy,
+ functionRegistry
+ );
}
private static boolean exactlyMatchesKeyColumns(
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java
index d6d168c47473..ed98fc10a4d6 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java
@@ -23,10 +23,15 @@
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.metastore.model.KeyField;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.name.SourceName;
+import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
@Immutable
public class RepartitionNode extends PlanNode {
@@ -78,4 +83,19 @@ public Expression getPartitionBy() {
public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitRepartition(this, context);
}
+
+ @Override
+ public Stream resolveSelectStar(
+ final Optional sourceName,
+ final boolean valueOnly
+ ) {
+ final boolean sourceNameMatches = !sourceName.isPresent() || sourceName.equals(getSourceName());
+ if (sourceNameMatches && valueOnly) {
+ // Override set of value columns to take into account the repartition:
+ return getSchema().withoutMetaAndKeyColsInValue().value().stream().map(Column::name);
+ }
+
+ // Set of all columns not changed by a repartition:
+ return super.resolveSelectStar(sourceName, valueOnly);
+ }
}
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
index f31c107274ae..a20790af504a 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
@@ -34,7 +34,6 @@
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamSelect;
-import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
import io.confluent.ksql.execution.plan.StreamTableJoin;
@@ -191,8 +190,6 @@ KeyField findKeyField(final List selectExpressions) {
final Optional filtered = found
// System columns can not be key fields:
.filter(f -> !SchemaUtil.isSystemColumn(f.name()))
- // Key columns can not be key fields:
- .filter(f -> !schema.isKeyColumn(f.name()))
.map(Column::name);
return KeyField.of(filtered);
@@ -338,11 +335,10 @@ public SchemaKStream selectKey(
+ "See https://github.com/confluentinc/ksql/issues/4385.");
}
- final StreamSelectKey step = ExecutionStepFactory.streamSelectKey(
- contextStacker,
- sourceStep,
- keyExpression
- );
+ final ExecutionStep> step = ksqlConfig
+ .getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)
+ ? ExecutionStepFactory.streamSelectKey(contextStacker, sourceStep, keyExpression)
+ : ExecutionStepFactory.streamSelectKeyV1(contextStacker, sourceStep, keyExpression);
return new SchemaKStream<>(
step,
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java
index ba54a44ad0da..ebd64b60f626 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java
@@ -26,6 +26,7 @@
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSelectKey;
+import io.confluent.ksql.execution.plan.StreamSelectKeyV1;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
@@ -67,6 +68,7 @@ public class PlanSummary {
.put(StreamFlatMap.class, "FLAT_MAP")
.put(StreamGroupBy.class, "GROUP_BY")
.put(StreamSelect.class, "PROJECT")
+ .put(StreamSelectKeyV1.class, "REKEY")
.put(StreamSelectKey.class, "REKEY")
.put(StreamSink.class, "SINK")
.put(StreamSource.class, "SOURCE")
diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java
new file mode 100644
index 000000000000..19653cc426f8
--- /dev/null
+++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/RepartitionNodeTest.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.planner.plan;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
+import io.confluent.ksql.metastore.model.KeyField;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.name.SourceName;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.types.SqlTypes;
+import io.confluent.ksql.util.SchemaUtil;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RepartitionNodeTest {
+
+ private static final PlanNodeId PLAN_ID = new PlanNodeId("Blah") ;
+ private static final SourceName MATCHING_SOURCE_NAME = SourceName.of("S1");
+ private static final ColumnName K0 = ColumnName.of("K");
+ private static final ColumnName V0 = ColumnName.of("V0");
+ private static final ColumnName V1 = ColumnName.of("V1");
+ private static final ColumnName V2 = ColumnName.of("V2");
+
+ private static final LogicalSchema SCHEMA = LogicalSchema.builder()
+ .withRowTime()
+ .keyColumn(K0, SqlTypes.DOUBLE)
+ .valueColumn(V0, SqlTypes.DOUBLE)
+ .valueColumn(V1, SqlTypes.DOUBLE)
+ .valueColumn(V2, SqlTypes.DOUBLE)
+ .valueColumn(K0, SqlTypes.DOUBLE)
+ .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT)
+ .build();
+
+ private static final List PARENT_COL_NAMES = ImmutableList.of(
+ ColumnName.of("this"),
+ ColumnName.of("that")
+ );
+
+ @Mock
+ private PlanNode parent;
+ @Mock
+ private Expression partitionBy;
+ private RepartitionNode repartitionNode;
+
+ @Before
+ public void setUp() {
+ when(parent.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
+ when(parent.getSourceName()).thenReturn(Optional.of(MATCHING_SOURCE_NAME));
+ when(parent.resolveSelectStar(any(), anyBoolean())).thenReturn(PARENT_COL_NAMES.stream());
+
+ repartitionNode = new RepartitionNode(PLAN_ID, parent, SCHEMA, partitionBy, KeyField.none());
+ }
+
+ @Test
+ public void shouldResolveSelectStarIfSourceMatchesAndValuesOnly() {
+ // When:
+ final Stream result = repartitionNode.resolveSelectStar(
+ Optional.of(MATCHING_SOURCE_NAME),
+ true
+ );
+
+ // Then:
+ final List names = result.collect(Collectors.toList());
+ assertThat(names, contains(V0, V1, V2));
+ }
+
+ @Test
+ public void shouldResolveSelectStarIfSourceNotProvidedAndValuesOnly() {
+ // When:
+ final Stream result = repartitionNode.resolveSelectStar(
+ Optional.empty(),
+ true
+ );
+
+ // Then:
+ final List names = result.collect(Collectors.toList());
+ assertThat(names, contains(V0, V1, V2));
+ }
+
+ @Test
+ public void shouldPassResolveSelectStarToParentIfNotValuesOnly() {
+ // When:
+ final Stream result = repartitionNode.resolveSelectStar(
+ Optional.empty(),
+ false
+ );
+
+ // Then:
+ final List names = result.collect(Collectors.toList());
+ assertThat(names, is(PARENT_COL_NAMES));
+
+ verify(parent).resolveSelectStar(Optional.empty(), false);
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
index 20d3488c5752..3ad6e9675df5 100644
--- a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
+++ b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
@@ -76,7 +76,6 @@
@RunWith(MockitoJUnitRunner.class)
public class SchemaKStreamTest {
- private static final SourceName TEST1 = SourceName.of("TEST1");
private static final Expression COL1 =
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1"));
@@ -513,7 +512,7 @@ public void shouldBuildStepForSelectKey() {
assertThat(
rekeyedSchemaKStream.getSourceStep(),
equalTo(
- ExecutionStepFactory.streamSelectKey(
+ ExecutionStepFactory.streamSelectKeyV1(
childContextStacker,
initialSchemaKStream.getSourceStep(),
new UnqualifiedColumnReferenceExp(ColumnName.of("COL1"))
diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java
index f29fe7504f9a..33bb2f1ea65c 100644
--- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java
+++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/builder/KsqlQueryBuilder.java
@@ -23,6 +23,7 @@
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
+import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
@@ -98,6 +99,12 @@ public ProcessingLogContext getProcessingLogContext() {
return processingLogContext;
}
+ public ProcessingLogger getProcessingLogger(final QueryContext queryContext) {
+ return processingLogContext
+ .getLoggerFactory()
+ .getLogger(QueryLoggerUtil.queryLoggerName(queryId, queryContext));
+ }
+
public ServiceContext getServiceContext() {
return serviceContext;
}
diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java
index 44bcab20f2c6..bd08466c8f0b 100644
--- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java
+++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java
@@ -33,7 +33,8 @@
@Type(value = StreamGroupBy.class, name = "streamGroupByV1"),
@Type(value = StreamGroupByKey.class, name = "streamGroupByKeyV1"),
@Type(value = StreamSelect.class, name = "streamSelectV1"),
- @Type(value = StreamSelectKey.class, name = "streamSelectKeyV1"),
+ @Type(value = StreamSelectKeyV1.class, name = "streamSelectKeyV1"),
+ @Type(value = StreamSelectKey.class, name = "streamSelectKeyV2"),
@Type(value = StreamSink.class, name = "streamSinkV1"),
@Type(value = StreamSource.class, name = "streamSourceV1"),
@Type(value = WindowedStreamSource.class, name = "windowedStreamSourceV1"),
diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java
index 81c1ca94cb0e..c9510a304e44 100644
--- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java
+++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java
@@ -37,6 +37,8 @@ public interface PlanBuilder {
KStreamHolder visitFlatMap(StreamFlatMap streamFlatMap);
+ KStreamHolder visitStreamSelectKey(StreamSelectKeyV1 streamSelectKey);
+
KStreamHolder visitStreamSelectKey(StreamSelectKey streamSelectKey);
KStreamHolder visitStreamSink(StreamSink streamSink);
diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java
new file mode 100644
index 000000000000..f08007d030c9
--- /dev/null
+++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.execution.plan;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.errorprone.annotations.Immutable;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.testing.EffectivelyImmutable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Struct;
+
+@Immutable
+public class StreamSelectKeyV1 implements ExecutionStep> {
+
+ private final ExecutionStepPropertiesV1 properties;
+ private final Expression keyExpression;
+ @EffectivelyImmutable
+ private final ExecutionStep extends KStreamHolder>> source;
+
+ public StreamSelectKeyV1(
+ @JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props,
+ @JsonProperty(value = "source", required = true) final
+ ExecutionStep extends KStreamHolder>> source,
+ @JsonProperty(value = "keyExpression", required = true) final Expression keyExpression
+ ) {
+ this.properties = Objects.requireNonNull(props, "props");
+ this.source = Objects.requireNonNull(source, "source");
+ this.keyExpression = Objects.requireNonNull(keyExpression, "keyExpression");
+ }
+
+ @Override
+ public ExecutionStepPropertiesV1 getProperties() {
+ return properties;
+ }
+
+ @Override
+ @JsonIgnore
+ public List> getSources() {
+ return Collections.singletonList(source);
+ }
+
+ public Expression getKeyExpression() {
+ return keyExpression;
+ }
+
+ public ExecutionStep extends KStreamHolder>> getSource() {
+ return source;
+ }
+
+ @Override
+ public KStreamHolder build(final PlanBuilder builder) {
+ return builder.visitStreamSelectKey(this);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamSelectKeyV1 that = (StreamSelectKeyV1) o;
+ return Objects.equals(properties, that.properties)
+ && Objects.equals(source, that.source);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, source);
+ }
+}
diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java
index d69f712cf060..8c508bd96c51 100644
--- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java
+++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyTest.java
@@ -37,6 +37,7 @@ public class StreamSelectKeyTest {
@Mock
private Expression expression2;
+ @SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementEquals() {
new EqualsTester()
diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java
new file mode 100644
index 000000000000..54debe4640fd
--- /dev/null
+++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSelectKeyV1Test.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.execution.plan;
+
+import com.google.common.testing.EqualsTester;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamSelectKeyV1Test {
+ @Mock
+ private ExecutionStepPropertiesV1 properties1;
+ @Mock
+ private ExecutionStepPropertiesV1 properties2;
+ @Mock
+ private ExecutionStep> source1;
+ @Mock
+ private ExecutionStep> source2;
+ @Mock
+ private Expression expression1;
+ @Mock
+ private Expression expression2;
+
+ @SuppressWarnings("UnstableApiUsage")
+ @Test
+ public void shouldImplementEquals() {
+ new EqualsTester()
+ .addEqualityGroup(
+ new StreamSelectKeyV1(properties1, source1, expression1),
+ new StreamSelectKeyV1(properties1, source1, expression1))
+ .addEqualityGroup(new StreamSelectKeyV1(properties2, source1, expression1))
+ .addEqualityGroup(new StreamSelectKeyV1(properties1, source2, expression1))
+ .addEqualityGroup(new StreamSelectKeyV1(properties1, source1, expression2));
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json
new file mode 100644
index 000000000000..afbb63470573
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/plan.json
@@ -0,0 +1,251 @@
+{
+ "plan" : [ {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM L (ID DOUBLE) WITH (KAFKA_TOPIC='left_topic', VALUE_FORMAT='JSON');",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "L",
+ "schema" : "`ROWKEY` STRING KEY, `ID` DOUBLE",
+ "keyField" : null,
+ "timestampColumn" : null,
+ "topicName" : "left_topic",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "windowInfo" : null
+ },
+ "queryPlan" : null
+ }, {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM R (ID INTEGER) WITH (KAFKA_TOPIC='right_topic', VALUE_FORMAT='JSON');",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "R",
+ "schema" : "`ROWKEY` STRING KEY, `ID` INTEGER",
+ "keyField" : null,
+ "timestampColumn" : null,
+ "topicName" : "right_topic",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "windowInfo" : null
+ },
+ "queryPlan" : null
+ }, {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM OUTPUT AS SELECT L.ID L_ID\nFROM L L\nINNER JOIN R R WITHIN 30 SECONDS ON ((CAST(L.ID AS INTEGER) = R.ID))\nEMIT CHANGES",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "OUTPUT",
+ "schema" : "`ROWKEY` INTEGER KEY, `L_ID` DOUBLE",
+ "keyField" : null,
+ "timestampColumn" : null,
+ "topicName" : "OUTPUT",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "windowInfo" : null
+ },
+ "queryPlan" : {
+ "sources" : [ "R", "L" ],
+ "sink" : "OUTPUT",
+ "physicalPlan" : {
+ "@type" : "streamSinkV1",
+ "properties" : {
+ "queryContext" : "OUTPUT"
+ },
+ "source" : {
+ "@type" : "streamSelectV1",
+ "properties" : {
+ "queryContext" : "Project"
+ },
+ "source" : {
+ "@type" : "streamStreamJoinV1",
+ "properties" : {
+ "queryContext" : "Join"
+ },
+ "joinType" : "INNER",
+ "leftInternalFormats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "rightInternalFormats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "leftSource" : {
+ "@type" : "streamSelectV1",
+ "properties" : {
+ "queryContext" : "PrependAliasLeft"
+ },
+ "source" : {
+ "@type" : "streamSelectKeyV1",
+ "properties" : {
+ "queryContext" : "LeftSourceKeyed"
+ },
+ "source" : {
+ "@type" : "streamSourceV1",
+ "properties" : {
+ "queryContext" : "KafkaTopic_Left/Source"
+ },
+ "topicName" : "left_topic",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "timestampColumn" : null,
+ "sourceSchema" : "`ROWKEY` STRING KEY, `ID` DOUBLE"
+ },
+ "keyExpression" : "CAST(ID AS INTEGER)"
+ },
+ "selectExpressions" : [ "ID AS L_ID", "ROWTIME AS L_ROWTIME", "ROWKEY AS L_ROWKEY" ]
+ },
+ "rightSource" : {
+ "@type" : "streamSelectV1",
+ "properties" : {
+ "queryContext" : "PrependAliasRight"
+ },
+ "source" : {
+ "@type" : "streamSelectKeyV1",
+ "properties" : {
+ "queryContext" : "RightSourceKeyed"
+ },
+ "source" : {
+ "@type" : "streamSourceV1",
+ "properties" : {
+ "queryContext" : "KafkaTopic_Right/Source"
+ },
+ "topicName" : "right_topic",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "timestampColumn" : null,
+ "sourceSchema" : "`ROWKEY` STRING KEY, `ID` INTEGER"
+ },
+ "keyExpression" : "ID"
+ },
+ "selectExpressions" : [ "ID AS R_ID", "ROWTIME AS R_ROWTIME", "ROWKEY AS R_ROWKEY" ]
+ },
+ "beforeMillis" : 30.000000000,
+ "afterMillis" : 30.000000000
+ },
+ "selectExpressions" : [ "L_ID AS L_ID" ]
+ },
+ "formats" : {
+ "keyFormat" : {
+ "format" : "KAFKA",
+ "properties" : { }
+ },
+ "valueFormat" : {
+ "format" : "JSON",
+ "properties" : { }
+ },
+ "options" : [ ]
+ },
+ "topicName" : "OUTPUT",
+ "timestampColumn" : null
+ },
+ "queryId" : "CSAS_OUTPUT_0"
+ }
+ } ],
+ "configs" : {
+ "ksql.extension.dir" : "ext",
+ "ksql.streams.cache.max.bytes.buffering" : "0",
+ "ksql.security.extension.class" : null,
+ "ksql.transient.prefix" : "transient_",
+ "ksql.persistence.wrap.single.values" : "true",
+ "ksql.authorization.cache.expiry.time.secs" : "30",
+ "ksql.schema.registry.url" : "",
+ "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
+ "ksql.output.topic.name.prefix" : "",
+ "ksql.streams.auto.offset.reset" : "earliest",
+ "ksql.query.pull.enable.standby.reads" : "false",
+ "ksql.connect.url" : "http://localhost:8083",
+ "ksql.service.id" : "some.ksql.service.id",
+ "ksql.internal.topic.min.insync.replicas" : "1",
+ "ksql.streams.shutdown.timeout.ms" : "300000",
+ "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent6364076579994155926",
+ "ksql.internal.topic.replicas" : "1",
+ "ksql.insert.into.values.enabled" : "true",
+ "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
+ "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
+ "ksql.access.validator.enable" : "auto",
+ "ksql.streams.bootstrap.servers" : "localhost:0",
+ "ksql.streams.commit.interval.ms" : "2000",
+ "ksql.metric.reporters" : "",
+ "ksql.query.pull.metrics.enabled" : "false",
+ "ksql.authentication.plugin.class" : null,
+ "ksql.streams.auto.commit.interval.ms" : "0",
+ "ksql.metrics.extension" : null,
+ "ksql.streams.topology.optimization" : "all",
+ "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
+ "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.streams.num.stream.threads" : "4",
+ "ksql.timestamp.throw.on.invalid" : "false",
+ "ksql.authorization.cache.max.entries" : "10000",
+ "ksql.metrics.tags.custom" : "",
+ "ksql.pull.queries.enable" : "true",
+ "ksql.udfs.enabled" : "true",
+ "ksql.udf.enable.security.manager" : "true",
+ "ksql.connect.worker.config" : "",
+ "ksql.any.key.name.enabled" : "false",
+ "ksql.sink.window.change.log.additional.retention" : "1000000",
+ "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.udf.collect.metrics" : "false",
+ "ksql.persistent.prefix" : "query_",
+ "ksql.query.persistent.active.limit" : "2147483647"
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json
new file mode 100644
index 000000000000..a40945cb1e35
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/spec.json
@@ -0,0 +1,34 @@
+{
+ "version" : "6.0.0",
+ "timestamp" : 1584548264774,
+ "schemas" : {
+ "CSAS_OUTPUT_0.KafkaTopic_Left.Source" : "STRUCT NOT NULL",
+ "CSAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT NOT NULL",
+ "CSAS_OUTPUT_0.Join.Left" : "STRUCT NOT NULL",
+ "CSAS_OUTPUT_0.Join.Right" : "STRUCT NOT NULL",
+ "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL"
+ },
+ "inputs" : [ {
+ "topic" : "left_topic",
+ "key" : "",
+ "value" : {
+ "id" : 1.0
+ },
+ "timestamp" : 10
+ }, {
+ "topic" : "right_topic",
+ "key" : "",
+ "value" : {
+ "id" : 1
+ },
+ "timestamp" : 11
+ } ],
+ "outputs" : [ {
+ "topic" : "OUTPUT",
+ "key" : 1,
+ "value" : {
+ "L_ID" : 1.0
+ },
+ "timestamp" : 11
+ } ]
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology
new file mode 100644
index 000000000000..239bf5b7d656
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_stream_stream_join_-_contains_CAST_double_to_int_-_reversed/6.0.0_1584548264774/topology
@@ -0,0 +1,69 @@
+Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [left_topic])
+ --> KSTREAM-TRANSFORMVALUES-0000000001
+ Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
+ --> KSTREAM-FILTER-0000000002
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-FILTER-0000000002 (stores: [])
+ --> KSTREAM-KEY-SELECT-0000000003
+ <-- KSTREAM-TRANSFORMVALUES-0000000001
+ Processor: KSTREAM-KEY-SELECT-0000000003 (stores: [])
+ --> PrependAliasLeft
+ <-- KSTREAM-FILTER-0000000002
+ Processor: PrependAliasLeft (stores: [])
+ --> Join-left-repartition-filter
+ <-- KSTREAM-KEY-SELECT-0000000003
+ Processor: Join-left-repartition-filter (stores: [])
+ --> Join-left-repartition-sink
+ <-- PrependAliasLeft
+ Sink: Join-left-repartition-sink (topic: Join-left-repartition)
+ <-- Join-left-repartition-filter
+
+ Sub-topology: 1
+ Source: KSTREAM-SOURCE-0000000005 (topics: [right_topic])
+ --> KSTREAM-TRANSFORMVALUES-0000000006
+ Processor: KSTREAM-TRANSFORMVALUES-0000000006 (stores: [])
+ --> KSTREAM-FILTER-0000000007
+ <-- KSTREAM-SOURCE-0000000005
+ Processor: KSTREAM-FILTER-0000000007 (stores: [])
+ --> KSTREAM-KEY-SELECT-0000000008
+ <-- KSTREAM-TRANSFORMVALUES-0000000006
+ Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
+ --> PrependAliasRight
+ <-- KSTREAM-FILTER-0000000007
+ Processor: PrependAliasRight (stores: [])
+ --> Join-right-repartition-filter
+ <-- KSTREAM-KEY-SELECT-0000000008
+ Processor: Join-right-repartition-filter (stores: [])
+ --> Join-right-repartition-sink
+ <-- PrependAliasRight
+ Sink: Join-right-repartition-sink (topic: Join-right-repartition)
+ <-- Join-right-repartition-filter
+
+ Sub-topology: 2
+ Source: Join-left-repartition-source (topics: [Join-left-repartition])
+ --> Join-this-windowed
+ Source: Join-right-repartition-source (topics: [Join-right-repartition])
+ --> Join-other-windowed
+ Processor: Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])
+ --> Join-other-join
+ <-- Join-right-repartition-source
+ Processor: Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])
+ --> Join-this-join
+ <-- Join-left-repartition-source
+ Processor: Join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])
+ --> Join-merge
+ <-- Join-other-windowed
+ Processor: Join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])
+ --> Join-merge
+ <-- Join-this-windowed
+ Processor: Join-merge (stores: [])
+ --> Project
+ <-- Join-this-join, Join-other-join
+ Processor: Project (stores: [])
+ --> KSTREAM-SINK-0000000022
+ <-- Join-merge
+ Sink: KSTREAM-SINK-0000000022 (topic: OUTPUT)
+ <-- Project
+
diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json
index bc1316a60260..2714c0991ad0 100644
--- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json
+++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json
@@ -1,5 +1,57 @@
{
"tests": [
+ {
+ "name": "matching columns in both sides = select *",
+ "statements": [
+ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');",
+ "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS SELECT * FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 10},
+ {"topic": "RIGHT", "key": 0, "value": {"B": -1, "C": -2}, "timestamp": 11}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 10, "R_ROWTIME": 11, "L_A": 0, "R_A": 0, "L_B": 1, "R_B": -1, "L_C": 2, "R_C": -2}, "timestamp": 11}
+ ],
+ "post": {
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "A INT KEY, L_ROWTIME BIGINT, L_A INT, L_B INT, L_C INT, R_ROWTIME BIGINT, R_A INT, R_B INT, R_C INT"}
+ ],
+ "topics": {
+ "blacklist": ".*-repartition"
+ }
+ }
+ },
+ {
+ "name": "matching columns in both sides = select left.* and right.*",
+ "statements": [
+ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');",
+ "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS SELECT l.*, r.* FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 10},
+ {"topic": "RIGHT", "key": 0, "value": {"B": -1, "C": -2}, "timestamp": 11}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 10, "R_ROWTIME": 11, "L_A": 0, "R_A": 0, "L_B": 1, "R_B": -1, "L_C": 2, "R_C": -2}, "timestamp": 11}
+ ],
+ "post": {
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "A INT KEY, L_ROWTIME BIGINT, L_A INT, L_B INT, L_C INT, R_ROWTIME BIGINT, R_A INT, R_B INT, R_C INT"}
+ ],
+ "topics": {
+ "blacklist": ".*-repartition"
+ }
+ }
+ },
{
"name": "stream-stream left join",
"format": ["AVRO", "JSON"],
@@ -104,7 +156,7 @@
}
},
{
- "name": "stream-stream left join with rowkey - rekey",
+ "name": "stream-stream left join with key in projection - rekey",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
@@ -135,12 +187,12 @@
],
"post": {
"sources": [
- {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
+ {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
]
}
},
{
- "name": "stream-stream left join with rowkey - rekey",
+ "name": "stream-stream left join with key in projection - rekey",
"format": ["PROTOBUF"],
"statements": [
"CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
@@ -171,7 +223,7 @@
],
"post": {
"sources": [
- {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, T_K STRING, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
+ {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, T_K STRING, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
]
}
},
@@ -223,7 +275,7 @@
],
"post": {
"sources": [
- {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
+ {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
]
}
},
@@ -275,7 +327,7 @@
],
"post": {
"sources": [
- {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ROWKEY BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
+ {"name": "LEFT_OUTER_JOIN", "type": "stream", "schema": "ID BIGINT KEY, T_ID BIGINT, NAME STRING, VALUE BIGINT, F1 STRING, F2 BIGINT"}
]
}
},
@@ -1216,7 +1268,7 @@
{"topic": "right_topic", "key": 1, "value": {"x": 3}, "timestamp": 10}
],
"outputs": [
- {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_X": 3, "TT_ROWTIME": 10, "TT_ID": 1}, "timestamp": 10},
+ {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_X": 3, "TT_ROWTIME": 10, "TT_ID": 1, "TT_KSQL_COL_0": 1}, "timestamp": 10},
{"topic": "OUTPUT", "key": 1, "value": {"T_X": 2}, "timestamp": 10}
]
},
@@ -1238,6 +1290,25 @@
{"topic": "OUTPUT", "key": 1, "value": {"L_X": 2}, "timestamp": 11}
]
},
+ {
+ "name": "stream-stream join on expression where schema contains KSQL_COL_xx column names",
+ "statements": [
+ "CREATE STREAM TEST1 (KSQL_COL_0 bigint KEY, KSQL_COL_2 bigint) WITH (kafka_topic='left_topic', value_format='JSON');",
+ "CREATE STREAM TEST2 (KSQL_COL_1 int KEY, KSQL_COL_3 int) WITH (kafka_topic='right_topic', value_format='JSON');",
+ "CREATE STREAM OUTPUT as SELECT t.KSQL_COL_2 FROM test1 t JOIN test2 tt WITHIN 30 seconds ON t.KSQL_COL_0 = CAST(tt.KSQL_COL_1 AS BIGINT);"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "left_topic", "key": 1, "value": {"KSQL_COL_2": 2}, "timestamp": 10},
+ {"topic": "right_topic", "key": 1, "value": {"KSQL_COL_3": 3}, "timestamp": 10}
+ ],
+ "outputs": [
+ {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-right-repartition", "key": 1, "value": {"TT_KSQL_COL_3": 3, "TT_ROWTIME": 10, "TT_KSQL_COL_1": 1, "TT_KSQL_COL_4": 1}, "timestamp": 10},
+ {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_2": 2}, "timestamp": 10}
+ ]
+ },
{
"name": "stream-stream join - contains subscript",
"statements": [
@@ -1420,7 +1491,7 @@
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "KAFKA"},
- "schema": "ROWKEY BIGINT KEY, S_ROWTIME BIGINT, S_K STRING, S_SF BIGINT, T_ROWTIME BIGINT, T_ID BIGINT, T_TF INT"
+ "schema": "SF BIGINT KEY, S_ROWTIME BIGINT, S_K STRING, S_SF BIGINT, T_ROWTIME BIGINT, T_ID BIGINT, T_TF INT"
}
]
}
@@ -1460,7 +1531,7 @@
],
"post": {
"sources": [
- {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, L_ID STRING, L1 INT, R1 INT"}
+ {"name": "OUTPUT", "type": "stream", "schema": "L0 INT KEY, L_ID STRING, L1 INT, R1 INT"}
]
}
},
@@ -1484,7 +1555,7 @@
],
"post": {
"sources": [
- {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY BIGINT KEY, L_ID STRING, L1 INT, R1 INT"}
+ {"name": "OUTPUT", "type": "stream", "schema": "L0 BIGINT KEY, L_ID STRING, L1 INT, R1 INT"}
]
}
},
@@ -1508,7 +1579,7 @@
],
"post": {
"sources": [
- {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY DOUBLE KEY, L_ID STRING, L1 INT, R1 INT"}
+ {"name": "OUTPUT", "type": "stream", "schema": "L0 DOUBLE KEY, L_ID STRING, L1 INT, R1 INT"}
]
}
},
@@ -1532,7 +1603,7 @@
],
"post": {
"sources": [
- {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, L_ID STRING, L1 INT, R1 INT"}
+ {"name": "OUTPUT", "type": "stream", "schema": "L0 STRING KEY, L_ID STRING, L1 INT, R1 INT"}
]
}
},
@@ -1640,7 +1711,7 @@
}
},
{
- "name": "windowed vand non-windowed - INT",
+ "name": "windowed and non-windowed - INT",
"statements": [
"CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ID INT KEY, V bigint) WITH (kafka_topic='right_topic', value_format='JSON');",
@@ -3527,6 +3598,21 @@
{"topic": "OUTPUT", "key": 1, "value": {"L_ID": 1}, "timestamp": 11}
]
},
+ {
+ "name": "stream stream join - contains CAST double to int - reversed",
+ "statements": [
+ "CREATE STREAM L (ID DOUBLE) WITH (kafka_topic='left_topic', value_format='JSON');",
+ "CREATE STREAM R (ID INT) WITH (kafka_topic='right_topic', value_format='JSON');",
+ "CREATE STREAM OUTPUT as SELECT L.ID FROM L JOIN R WITHIN 30 seconds ON CAST(L.id AS INT) = R.id;"
+ ],
+ "inputs": [
+ {"topic": "left_topic", "value": {"id": 1.0}, "timestamp": 10},
+ {"topic": "right_topic", "value": {"id": 1}, "timestamp": 11}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 1, "value": {"L_ID": 1.0}, "timestamp": 11}
+ ]
+ },
{
"name": "stream stream join - contains subscript",
"statements": [
diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
index 5879cef22ba7..b48ad07b4f2b 100644
--- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
+++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
@@ -1,5 +1,141 @@
{
"tests": [
+ {
+ "name": "only key column - select star",
+ "statements": [
+ "CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS select * from INPUT partition by ID;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob"}}],
+ "outputs": [{"topic": "OUTPUT", "key": 10, "value": {"NAME": "bob"}}],
+ "post": {
+ "topics": {
+ "blacklist": ".*-repartition"
+ },
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, NAME STRING"}
+ ]
+ }
+ },
+ {
+ "name": "single value column - select star",
+ "statements": [
+ "CREATE STREAM INPUT (ID INT KEY, NAME STRING, AGE INT) with (kafka_topic='input', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS select * from INPUT partition by NAME;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob", "AGE": 22}}],
+ "outputs": [{"topic": "OUTPUT", "key": "bob", "value": {"ID": 10, "AGE": 22}}],
+ "post": {
+ "topics": {
+ "blacklist": ".*-repartition"
+ },
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, AGE INT, ID INT"}
+ ]
+ }
+ },
+ {
+ "name": "expression - select star",
+ "statements": [
+ "CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS select * from INPUT partition by ABS(ID);"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob"}}],
+ "outputs": [{"topic": "OUTPUT", "key": 10, "value": {"NAME": "bob", "ID": 10}}],
+ "post": {
+ "topics": {
+ "blacklist": ".*-repartition"
+ },
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "KSQL_COL_0 INT KEY, NAME STRING, ID INT"}
+ ]
+ }
+ },
+ {
+ "name": "only key column - select columns",
+ "statements": [
+ "CREATE STREAM INPUT (ID INT KEY, NAME STRING, OTHER INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS SELECT NAME FROM INPUT PARTITION BY ID;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "test_topic", "key": 11, "value": {"name": "a"}},
+ {"topic": "test_topic", "key": 10, "value": {"name": "b"}},
+ {"topic": "test_topic", "key": 11, "value": {"name": "c"}}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 11, "value": {"NAME": "a"}},
+ {"topic": "OUTPUT", "key": 10, "value": {"NAME": "b"}},
+ {"topic": "OUTPUT", "key": 11, "value": {"NAME": "c"}}
+ ],
+ "post": {
+ "sources": [
+ {
+ "name": "OUTPUT",
+ "type": "stream",
+ "keyFormat": {"format": "KAFKA"},
+ "schema": "ID INT KEY, NAME STRING"
+ }
+ ]
+ }
+ },
+ {
+ "name": "only key column - select star - with join on keys",
+ "statements": [
+ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');",
+ "CREATE STREAM R (X INT KEY, Y INT, Z INT) WITH (kafka_topic='RIGHT', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS SELECT * FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.X PARTITION BY L.B;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 11},
+ {"topic": "RIGHT", "key": 0, "value": {"Y": -1, "Z": -2}, "timestamp": 12}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 1, "value": {"L_ROWTIME": 11, "R_ROWTIME": 12, "L_A": 0, "R_X": 0, "R_Y": -1, "L_C": 2, "R_Z": -2}}
+ ],
+ "post": {
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "L_B INT KEY, L_C INT, L_ROWTIME BIGINT, L_A INT, R_Y INT, R_Z INT, R_ROWTIME BIGINT, R_X INT"}
+ ]
+ }
+ },
+ {
+ "name": "only key column - select star - with join on value columns",
+ "statements": [
+ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');",
+ "CREATE STREAM R (X INT KEY, Y INT, Z INT) WITH (kafka_topic='RIGHT', value_format='JSON');",
+ "CREATE STREAM OUTPUT AS SELECT * FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.Y PARTITION BY A;"
+ ],
+ "properties": {
+ "ksql.any.key.name.enabled": true
+ },
+ "inputs": [
+ {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 11},
+ {"topic": "RIGHT", "key": -1, "value": {"Y": 1, "Z": -2}, "timestamp": 12}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": 0, "value": {"L_ROWTIME": 11, "R_ROWTIME": 12, "R_X": -1, "L_B": 1, "R_Y": 1, "L_C": 2, "R_Z": -2}}
+ ],
+ "post": {
+ "sources": [
+ {"name": "OUTPUT", "type": "stream", "schema": "L_A INT KEY, L_B INT, L_C INT, L_ROWTIME BIGINT, R_Y INT, R_Z INT, R_ROWTIME BIGINT, R_X INT"}
+ ]
+ }
+ },
{
"name": "partition by with projection select some",
"statements": [
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java
index c7b17221a973..1036b5f3a9b9 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlPlanSchemaTest.java
@@ -44,7 +44,7 @@ public void shouldBuildSameSchemaForKsqlPlan() throws IOException {
"Detected a change to the schema of the KSQL physical plan. This is dangerous. "
+ "It means that KSQL may no longer be able to read and execute older plans. "
+ "If you're sure that your change is backwards-compatible, then please regenerate "
- + "the schema using KsqlPlanSchemaGenerator and update schema.json.",
+ + "the schema.json by running KsqlPlanSchemaTest.runMeToRegenerateSchemaFile().",
jsonSchema,
is(expected)
);
diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json
index 3fde5a933798..2d018d4b4e27 100644
--- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json
+++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json
@@ -113,7 +113,9 @@
},
"properties" : {
"type" : "object",
- "additionalProperties": {"type": "string"}
+ "additionalProperties" : {
+ "type" : "string"
+ }
}
},
"required" : [ "format" ]
@@ -405,7 +407,7 @@
"title" : "streamSelectV1",
"required" : [ "@type", "properties", "source", "selectExpressions" ]
},
- "StreamSelectKey" : {
+ "StreamSelectKeyV1" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
@@ -427,6 +429,28 @@
"title" : "streamSelectKeyV1",
"required" : [ "@type", "properties", "source", "keyExpression" ]
},
+ "StreamSelectKey" : {
+ "type" : "object",
+ "additionalProperties" : false,
+ "properties" : {
+ "@type" : {
+ "type" : "string",
+ "enum" : [ "streamSelectKeyV2" ],
+ "default" : "streamSelectKeyV2"
+ },
+ "properties" : {
+ "$ref" : "#/definitions/ExecutionStepPropertiesV1"
+ },
+ "source" : {
+ "$ref" : "#/definitions/ExecutionStep"
+ },
+ "keyExpression" : {
+ "type" : "string"
+ }
+ },
+ "title" : "streamSelectKeyV2",
+ "required" : [ "@type", "properties", "source", "keyExpression" ]
+ },
"StreamSink" : {
"type" : "object",
"additionalProperties" : false,
@@ -853,6 +877,8 @@
"$ref" : "#/definitions/StreamGroupByKey"
}, {
"$ref" : "#/definitions/StreamSelect"
+ }, {
+ "$ref" : "#/definitions/StreamSelectKeyV1"
}, {
"$ref" : "#/definitions/StreamSelectKey"
}, {
diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java
index 1309e6284527..357e5ab3f36f 100644
--- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java
+++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java
@@ -34,6 +34,7 @@
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSelectKey;
+import io.confluent.ksql.execution.plan.StreamSelectKeyV1;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
@@ -233,6 +234,15 @@ public static StreamStreamJoin streamStreamJoin(
);
}
+ public static StreamSelectKeyV1 streamSelectKeyV1(
+ final QueryContext.Stacker stacker,
+ final ExecutionStep extends KStreamHolder>> source,
+ final Expression fieldName
+ ) {
+ final QueryContext queryContext = stacker.getQueryContext();
+ return new StreamSelectKeyV1(new ExecutionStepPropertiesV1(queryContext), source, fieldName);
+ }
+
public static StreamSelectKey streamSelectKey(
final QueryContext.Stacker stacker,
final ExecutionStep extends KStreamHolder>> source,
diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java
index be7219027455..b801925e3c0e 100644
--- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java
+++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java
@@ -28,6 +28,7 @@
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSelectKey;
+import io.confluent.ksql.execution.plan.StreamSelectKeyV1;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
@@ -135,7 +136,16 @@ public KStreamHolder visitFlatMap(final StreamFlatMap streamFlatMap) {
@Override
public KStreamHolder visitStreamSelectKey(
- final StreamSelectKey streamSelectKey) {
+ final StreamSelectKeyV1 streamSelectKey
+ ) {
+ final KStreamHolder> source = streamSelectKey.getSource().build(this);
+ return StreamSelectKeyBuilderV1.build(source, streamSelectKey, queryBuilder);
+ }
+
+ @Override
+ public KStreamHolder visitStreamSelectKey(
+ final StreamSelectKey streamSelectKey
+ ) {
final KStreamHolder> source = streamSelectKey.getSource().build(this);
return StreamSelectKeyBuilder.build(source, streamSelectKey, queryBuilder);
}
diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java
new file mode 100644
index 000000000000..138b2f2cc036
--- /dev/null
+++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.execution.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import io.confluent.ksql.GenericRow;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.streams.KeyValue;
+
+public final class PartitionByParams {
+
+ private final LogicalSchema schema;
+ private BiPredicate predicate;
+ private BiFunction> mapper;
+
+ public PartitionByParams(
+ final LogicalSchema schema,
+ final BiPredicate predicate,
+ final BiFunction> mapper
+ ) {
+ this.schema = requireNonNull(schema, "schema");
+ this.predicate = requireNonNull(predicate, "predicate");
+ this.mapper = requireNonNull(mapper, "mapper");
+ }
+
+ public LogicalSchema getSchema() {
+ return schema;
+ }
+
+ public BiPredicate getPredicate() {
+ return predicate;
+ }
+
+ public BiFunction> getMapper() {
+ return mapper;
+ }
+}
diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
new file mode 100644
index 000000000000..5e318f05207f
--- /dev/null
+++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.execution.streams;
+
+import io.confluent.ksql.GenericRow;
+import io.confluent.ksql.execution.codegen.CodeGenRunner;
+import io.confluent.ksql.execution.codegen.ExpressionMetadata;
+import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.util.EngineProcessingLogMessageFactory;
+import io.confluent.ksql.execution.util.ExpressionTypeManager;
+import io.confluent.ksql.execution.util.StructKeyUtil;
+import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
+import io.confluent.ksql.function.FunctionRegistry;
+import io.confluent.ksql.logging.processing.ProcessingLogger;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.name.ColumnNames;
+import io.confluent.ksql.schema.ksql.Column;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import io.confluent.ksql.util.KsqlConfig;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.streams.KeyValue;
+
+/**
+ * Factory for PartitionByParams.
+ *
+ *