Skip to content

Commit

Permalink
feat(static): support ROWKEY in the projection of static queries (#3439)
Browse files Browse the repository at this point in the history
* feat(static): support ROWKEY in the projection of static queries
  • Loading branch information
big-andy-coates authored Oct 2, 2019
1 parent 502c662 commit 9218766
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public Void visitColumnReference(
final ColumnReferenceExp node,
final ExpressionTypeContext expressionTypeContext
) {
final Column schemaColumn = schema.findValueColumn(node.getReference().toString())
final Column schemaColumn = schema.findColumn(node.getReference().name())
.orElseThrow(() ->
new KsqlException(String.format("Invalid Expression %s.", node.toString())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression.Sign;
import io.confluent.ksql.execution.expression.tree.Cast;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.DoubleLiteral;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -44,9 +45,6 @@
import io.confluent.ksql.execution.expression.tree.InPredicate;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.expression.tree.LikePredicate;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.SearchedCaseExpression;
import io.confluent.ksql.execution.expression.tree.SimpleCaseExpression;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
Expand All @@ -58,12 +56,13 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlFunction;
import io.confluent.ksql.function.UdfFactory;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
import org.junit.Before;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.execution.util;

import static io.confluent.ksql.execution.testutil.TestExpressions.ADDRESS;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL0;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL1;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL2;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL3;
Expand Down Expand Up @@ -54,6 +53,7 @@
import io.confluent.ksql.execution.expression.tree.TimestampLiteral;
import io.confluent.ksql.execution.expression.tree.WhenClause;
import io.confluent.ksql.execution.function.udf.structfieldextractor.FetchFieldFromStruct;
import io.confluent.ksql.execution.testutil.TestExpressions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlFunction;
import io.confluent.ksql.function.UdfFactory;
Expand All @@ -79,6 +79,10 @@

@SuppressWarnings("OptionalGetWithoutIsPresent")
public class ExpressionTypeManagerTest {

private static final SourceName TEST1 = SourceName.of("TEST1");
private static final ColumnName COL0 = ColumnName.of("COL0");

@Mock
private FunctionRegistry functionRegistry;
@Mock
Expand Down Expand Up @@ -116,7 +120,8 @@ private void givenUdfWithNameAndReturnType(

@Test
public void shouldResolveTypeForAddBigIntDouble() {
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, COL0, COL3);
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
COL3);

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -134,7 +139,8 @@ public void shouldResolveTypeForAddDoubleIntegerLiteral() {

@Test
public void shouldResolveTypeForAddBigintIntegerLiteral() {
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, COL0, literal(10));
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
literal(10));

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -144,7 +150,7 @@ public void shouldResolveTypeForAddBigintIntegerLiteral() {
@Test
public void shouldResolveTypeForMultiplyBigintIntegerLiteral() {
final Expression expression =
new ArithmeticBinaryExpression(Operator.MULTIPLY, COL0, literal(10));
new ArithmeticBinaryExpression(Operator.MULTIPLY, TestExpressions.COL0, literal(10));

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -153,7 +159,8 @@ public void shouldResolveTypeForMultiplyBigintIntegerLiteral() {

@Test
public void testComparisonExpr() {
final Expression expression = new ComparisonExpression(Type.GREATER_THAN, COL0, COL3);
final Expression expression = new ComparisonExpression(Type.GREATER_THAN, TestExpressions.COL0,
COL3);

final SqlType exprType = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -163,7 +170,8 @@ public void testComparisonExpr() {
@Test
public void shouldFailIfComparisonOperandsAreIncompatible() {
// Given:
final ComparisonExpression expr = new ComparisonExpression(Type.GREATER_THAN, COL0, COL1);
final ComparisonExpression expr = new ComparisonExpression(Type.GREATER_THAN,
TestExpressions.COL0, COL1);
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Operator GREATER_THAN cannot be used to compare BIGINT and STRING");

Expand Down Expand Up @@ -283,7 +291,7 @@ public void shouldThrowOnStructFieldDereference() {
// Given:
final Expression expression = new DereferenceExpression(
Optional.empty(),
new ColumnReferenceExp(ColumnRef.of(SourceName.of("TEST1"), ColumnName.of("COL6"))),
new ColumnReferenceExp(ColumnRef.of(TEST1, ColumnName.of("COL6"))),
"STREET"
);

Expand Down Expand Up @@ -322,10 +330,11 @@ public void shouldEvaluateTypeForStructDereferenceInArray() {
// Given:
final SqlStruct inner = SqlTypes.struct().field("IN0", SqlTypes.INTEGER).build();
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("TEST1.COL0"), SqlTypes.array(inner))
.valueColumn(TEST1, COL0, SqlTypes.array(inner))
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
final Expression arrayRef = new SubscriptExpression(COL0, new IntegerLiteral(1));
final Expression arrayRef = new SubscriptExpression(TestExpressions.COL0,
new IntegerLiteral(1));
final Expression expression = new FunctionCall(
FunctionName.of(FetchFieldFromStruct.FUNCTION_NAME),
ImmutableList.of(arrayRef, new StringLiteral("IN0"))
Expand All @@ -343,12 +352,12 @@ public void shouldEvaluateTypeForArrayReferenceInStruct() {
.field("IN0", SqlTypes.array(SqlTypes.INTEGER))
.build();
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("TEST1.COL0"), inner)
.valueColumn(TEST1, COL0, inner)
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
final Expression structRef = new FunctionCall(
FunctionName.of(FetchFieldFromStruct.FUNCTION_NAME),
ImmutableList.of(COL0, new StringLiteral("IN0"))
ImmutableList.of(TestExpressions.COL0, new StringLiteral("IN0"))
);
final Expression expression = new SubscriptExpression(structRef, new IntegerLiteral(1));

Expand Down Expand Up @@ -387,7 +396,7 @@ public void shouldGetCorrectSchemaForSearchedCaseWhenStruct() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
ADDRESS)
),
Optional.empty()
Expand All @@ -407,7 +416,8 @@ public void shouldFailIfWhenIsNotBoolean() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ArithmeticBinaryExpression(Operator.ADD, COL0, new IntegerLiteral(10)),
new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
new IntegerLiteral(10)),
new StringLiteral("foo"))
),
Optional.empty()
Expand All @@ -425,10 +435,10 @@ public void shouldFailOnInconsistentWhenResultType() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(100)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(100)),
new StringLiteral("one-hundred")),
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
new IntegerLiteral(10))
),
Optional.empty()
Expand All @@ -447,7 +457,7 @@ public void shouldFailIfDefaultHasDifferentTypeToWhen() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
new StringLiteral("good"))
),
Optional.of(new BooleanLiteral("true"))
Expand Down Expand Up @@ -481,7 +491,7 @@ public void shouldThrowOnTimestampLiteral() {
public void shouldThrowOnIn() {
// Given:
final Expression expression = new InPredicate(
COL0,
TestExpressions.COL0,
new InListExpression(ImmutableList.of(new IntegerLiteral(1), new IntegerLiteral(2)))
);

Expand All @@ -495,7 +505,7 @@ public void shouldThrowOnIn() {
@Test
public void shouldThrowOnSimpleCase() {
final Expression expression = new SimpleCaseExpression(
COL0,
TestExpressions.COL0,
ImmutableList.of(new WhenClause(new IntegerLiteral(10), new StringLiteral("ten"))),
Optional.empty()
);
Expand Down
Loading

0 comments on commit 9218766

Please sign in to comment.