Skip to content

Commit

Permalink
feat: support quoted identifiers in column names (#3477)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Oct 10, 2019
1 parent c401ec0 commit be2bdcc
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.name;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.util.KsqlPreconditions;

/**
* The name of a column within a source.
Expand All @@ -31,13 +30,6 @@ public static ColumnName aggregate(final int idx) {
}

public static ColumnName of(final String name) {
KsqlPreconditions.checkServerCondition(!name.contains("."), "expected no aliased fields!");
return new ColumnName(name);
}

// this can be used to create a column name without validating that it doesn't
// have an alias - unfortunately this is necessary for our group by key creation :(
public static ColumnName withoutValidation(final String name) {
return new ColumnName(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StringUtil;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory;
import java.util.Objects;
Expand Down Expand Up @@ -152,8 +151,7 @@ private static Optional<ColumnName> buildKeyFieldName(
final CreateSource statement,
final LogicalSchema schema) {
if (statement.getProperties().getKeyField().isPresent()) {
final String name = statement.getProperties().getKeyField().get().toUpperCase();
final ColumnName columnName = ColumnName.of(StringUtil.cleanQuotes(name));
final ColumnName columnName = statement.getProperties().getKeyField().get().name();
schema.findValueColumn(ColumnRef.withoutSource(columnName)).orElseThrow(
() -> new KsqlException(
"The KEY column set in the WITH clause does not exist in the schema: '"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ ColumnRef groupedKeyNameFor(final List<Expression> groupByExpressions) {
// will never be in the original schema, so we're necessarily creating a
// new field
return ColumnRef.withoutSource(
ColumnName.withoutValidation(groupByExpressions.stream()
ColumnName.of(groupByExpressions.stream()
.map(Expression::toString)
.collect(Collectors.joining(GROUP_BY_COLUMN_SEPARATOR))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,14 @@ public void shouldNotThrowIfTopicDoesExist() {
@Test
public void shouldThrowIfKeyFieldNotInSchemaForStream() {
// Given:
givenProperty(CreateConfigs.KEY_NAME_PROPERTY, new StringLiteral("will-not-find-me"));
givenProperty(CreateConfigs.KEY_NAME_PROPERTY, new StringLiteral("`will-not-find-me`"));
final CreateStream statement = new CreateStream(SOME_NAME, SOME_ELEMENTS, true, withProperties);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"The KEY column set in the WITH clause does not exist in the schema: "
+ "'WILL-NOT-FIND-ME'");
+ "'will-not-find-me'");

// When:
createSourceFactory.createStreamCommand("expression", statement, ksqlConfig);
Expand All @@ -410,7 +410,7 @@ public void shouldThrowIfTimestampColumnDoesNotExistForStream() {
// Given:
givenProperty(
CommonCreateConfigs.TIMESTAMP_NAME_PROPERTY,
new StringLiteral("will-not-find-me")
new StringLiteral("`will-not-find-me`")
);
final CreateStream statement =
new CreateStream(SOME_NAME, SOME_ELEMENTS, true, withProperties);
Expand All @@ -419,7 +419,7 @@ public void shouldThrowIfTimestampColumnDoesNotExistForStream() {
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"The TIMESTAMP column set in the WITH clause does not exist in the schema: "
+ "'WILL-NOT-FIND-ME'");
+ "'will-not-find-me'");

// When:
createSourceFactory.createStreamCommand("expression", statement, ksqlConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -458,7 +457,7 @@ public void testGroupBy() {
assertThat(groupedSchemaKTable, instanceOf(SchemaKGroupedTable.class));
assertThat(groupedSchemaKTable.getKeyField().ref(), is(Optional.empty()));
assertThat(groupedSchemaKTable.getKeyField().legacy().map(LegacyField::columnRef),
is(Optional.of(ColumnRef.withoutSource(ColumnName.withoutValidation("TEST2.COL2|+|TEST2.COL1")))));
is(Optional.of(ColumnRef.withoutSource(ColumnName.of("TEST2.COL2|+|TEST2.COL1")))));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.ColumnReferenceParser;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.test.utils.TestParsingUtil;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.hamcrest.FeatureMatcher;
Expand Down Expand Up @@ -100,7 +100,7 @@ private KeyFieldMatchers() {

public static Matcher<KeyField> hasName(final Optional<String> name) {
return new FeatureMatcher<KeyField, Optional<ColumnName>>(
is(name.map(TestParsingUtil::parseColumnRef).map(ColumnRef::name)),
is(name.map(ColumnReferenceParser::parse).map(ColumnRef::name)),
"field with name",
"name"
) {
Expand All @@ -113,7 +113,7 @@ protected Optional<ColumnName> featureValueOf(final KeyField actual) {

public static Matcher<KeyField> hasLegacyName(final Optional<String> name) {
return new FeatureMatcher<KeyField, Optional<ColumnName>>(
is(name.map(TestParsingUtil::parseColumnRef).map(ColumnRef::name)),
is(name.map(ColumnReferenceParser::parse).map(ColumnRef::name)),
"field with legacy name",
"legacy name") {
@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "KSQL_INTERNAL_COL_1|+|KSQL_INTERNAL_COL_0", "legacySchema": "STRING"}}
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "`KSQL_INTERNAL_COL_1|+|KSQL_INTERNAL_COL_0`", "legacySchema": "STRING"}}
]
}
},
Expand Down Expand Up @@ -1349,7 +1349,7 @@
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "KSQL_INTERNAL_COL_1|+|KSQL_INTERNAL_COL_0", "legacySchema": "STRING"}}
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "`KSQL_INTERNAL_COL_1|+|KSQL_INTERNAL_COL_0`", "legacySchema": "STRING"}}
]
}
},
Expand All @@ -1370,7 +1370,7 @@
"key field of output has INCORRECT name: should be 'KSQL_COL_0'. See https://github.com/confluentinc/ksql/issues/2740"
],
"sources": [
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "(KSQL_INTERNAL_COL_0 + KSQL_INTERNAL_COL_1)", "legacySchema": "STRING"}}
{"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": "`(KSQL_INTERNAL_COL_0 + KSQL_INTERNAL_COL_1)`", "legacySchema": "STRING"}}
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
{
"comments": [
"Tests covering the use of quoted identifiers."
],
"tests": [
{
"name": "source fields that require quotes",
"statements": [
"CREATE STREAM TEST (`with.dot` VARCHAR, `*bad!chars*` VARCHAR, `SELECT` VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT * FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"with.dot": "popcorn", "*bad!chars*": "cheetos", "SELECT": "reserved"}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"with.dot": "popcorn", "*bad!chars*": "cheetos", "SELECT": "reserved"}}
]
},
{
"name": "sink fields that require quotes",
"statements": [
"CREATE STREAM TEST (a VARCHAR, b VARCHAR, c VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT A as `with.dot`, B as `*bad!chars*`, C as `SELECT` FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"A": "popcorn", "B": "cheetos", "C": "reserved"}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"with.dot": "popcorn", "*bad!chars*": "cheetos", "SELECT": "reserved"}}
]
},
{
"name": "udf using fields that require quotes",
"statements": [
"CREATE STREAM TEST (`SELECT` INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ABS(`SELECT`) FOO FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"SELECT": -2}}
],
"outputs": [
{"topic": "OUTPUT", "value": { "FOO": 2.0}}
]
},
{
"name": "math using fields that require quotes",
"statements": [
"CREATE STREAM TEST (`SELECT` INT, `with.dot` INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT `SELECT` * `with.dot` AS FOO FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"with.dot": 1, "SELECT": 2}}
],
"outputs": [
{"topic": "OUTPUT", "value": { "FOO": 2}}
]
},
{
"name": "create table with key that is quoted",
"statements": [
"CREATE TABLE TEST (`some.key` VARCHAR) WITH (kafka_topic='test_topic', key='`some.key`', value_format='JSON');",
"CREATE TABLE OUTPUT AS SELECT * FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"some.key": "key"}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"some.key": "key"}}
],
"post": {
"sources": [
{"name": "TEST", "type": "table", "keyField": {"name": "`some.key`", "legacyName": "`some.key`"}},
{"name": "OUTPUT", "type": "table", "keyField": {"name": "`some.key`", "legacyName": "`some.key`"}}
]
}
},
{
"name": "joins using fields that require quotes",
"statements": [
"CREATE STREAM L (`SELECT` VARCHAR, `field!` VARCHAR) WITH (kafka_topic='left_topic', value_format='JSON');",
"CREATE TABLE R (`with.dot` VARCHAR) WITH (kafka_topic='right_topic', value_format='JSON', key='`with.dot`');",
"CREATE STREAM JOINED as SELECT L.`field!` FROM L LEFT JOIN R ON L.`SELECT` = R.`with.dot`;"
],
"inputs": [
{"topic": "left_topic", "value": {"SELECT": "1", "field!": "A"}},
{"topic": "right_topic", "value": {"with.dot": "1"}},
{"topic": "right_topic", "value": {"with.dot": "2"}}
],
"outputs": [
{"topic": "JOINED", "key": "1", "value": { "field!": "A"}}
]
}
]
}
29 changes: 8 additions & 21 deletions ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
Expand Down Expand Up @@ -980,29 +981,15 @@ public Node visitDereference(final SqlBaseParser.DereferenceContext context) {

@Override
public Node visitColumnReference(final SqlBaseParser.ColumnReferenceContext context) {
final ColumnReferenceExp columnReferenceExp = ColumnReferenceParser.resolve(context);
final ColumnRef reference = columnReferenceExp.getReference();

final ColumnName columnName;
final SourceName prefixName;
final Optional<NodeLocation> columnLocation;
if (context.identifier(1) == null) {
prefixName = null;
columnName = ColumnName.of(ParserUtil.getIdentifierText(context.identifier(0)));
columnLocation = getLocation(context.identifier(0));
} else {
prefixName = SourceName.of(ParserUtil.getIdentifierText(context.identifier(0)));
columnName = ColumnName.of(ParserUtil.getIdentifierText(context.identifier(1)));
columnLocation = getLocation(context.identifier(1));
}

if (prefixName != null) {
throwOnUnknownNameOrAlias(prefixName);

return new ColumnReferenceExp(
getLocation(context),
ColumnRef.of(prefixName, columnName)
);
if (reference.source().isPresent()) {
throwOnUnknownNameOrAlias(reference.source().get());
return columnReferenceExp;
}

final ColumnName columnName = reference.name();
if (dataSourceExtractor.isJoin()) {
if (dataSourceExtractor.getCommonFieldNames().contains(columnName)) {
throw new KsqlException("Field '" + columnName.name() + "' is ambiguous.");
Expand All @@ -1023,7 +1010,7 @@ public Node visitColumnReference(final SqlBaseParser.ColumnReferenceContext cont
}

throw new InvalidColumnReferenceException(
columnLocation,
getLocation(Iterables.getLast(context.identifier())),
"Field '" + columnName.name() + "' cannot be resolved."
);
}
Expand Down
Loading

0 comments on commit be2bdcc

Please sign in to comment.