diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 176b74c3fddc..e382bd7256b9 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -408,7 +408,7 @@ private List extractSelectItems(final Select select, final Relation final List selectItems = new ArrayList<>(); for (final SelectItem selectItem : select.getSelectItems()) { if (selectItem instanceof AllColumns) { - selectItems.addAll(getSelectStarItems(selectItem, from)); + selectItems.addAll(getSelectStarItems((AllColumns) selectItem, from)); } else if (selectItem instanceof SingleColumn) { selectItems.add(selectItem); @@ -420,9 +420,8 @@ private List extractSelectItems(final Select select, final Relation return selectItems; } - private List getSelectStarItems(final SelectItem selectItem, final Relation from) { + private List getSelectStarItems(final AllColumns allColumns, final Relation from) { final List selectItems = new ArrayList<>(); - final AllColumns allColumns = (AllColumns) selectItem; final NodeLocation location = allColumns.getLocation().orElse(null); if (from instanceof Join) { @@ -436,7 +435,7 @@ private List getSelectStarItems(final SelectItem selectItem, final R throw new InvalidColumnReferenceException("Source for alias '" + allColumns.getPrefix().get() + "' doesn't exist"); } - addFieldsFromDataSource(selectItems, source, location, alias); + addFieldsFromDataSource(selectItems, source, location, alias, allColumns); } else { final AliasedRelation left = (AliasedRelation) join.getLeft(); final StructuredDataSource @@ -453,8 +452,10 @@ private List getSelectStarItems(final SelectItem selectItem, final R throw new InvalidColumnReferenceException(right.getRelation().toString() + " does not exist."); } - addFieldsFromDataSource(selectItems, leftDataSource, location, left.getAlias()); - addFieldsFromDataSource(selectItems, rightDataSource, location, right.getAlias()); + addFieldsFromDataSource( + selectItems, leftDataSource, location, left.getAlias(), allColumns); + addFieldsFromDataSource( + selectItems, rightDataSource, location, right.getAlias(), allColumns); } } else { final AliasedRelation fromRel = (AliasedRelation) from; @@ -471,7 +472,7 @@ private List getSelectStarItems(final SelectItem selectItem, final R new QualifiedNameReference(location, QualifiedName .of(fromDataSource.getName() + "." + field.name())); final SingleColumn newSelectItem = - new SingleColumn(qualifiedNameReference, field.name()); + new SingleColumn(qualifiedNameReference, field.name(), allColumns); selectItems.add(newSelectItem); } } @@ -481,7 +482,8 @@ private List getSelectStarItems(final SelectItem selectItem, final R private void addFieldsFromDataSource(final List selectItems, final StructuredDataSource dataSource, final NodeLocation location, - final String alias) { + final String alias, + final AllColumns source) { for (final Field field : dataSource.getSchema().fields()) { final QualifiedNameReference qualifiedNameReference = new QualifiedNameReference( @@ -490,7 +492,8 @@ private void addFieldsFromDataSource(final List selectItems, ); selectItems.add(new SingleColumn( qualifiedNameReference, - alias + "_" + field.name() + alias + "_" + field.name(), + source )); } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index c256262499a3..2880cb208642 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -66,6 +66,8 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; public final class SqlFormatter { @@ -76,15 +78,13 @@ private SqlFormatter() { } public static String formatSql(final Node root) { - final StringBuilder builder = new StringBuilder(); - new Formatter(builder, true).process(root, 0); - return builder.toString(); + return formatSql(root, true); } public static String formatSql(final Node root, final boolean unmangleNames) { final StringBuilder builder = new StringBuilder(); new Formatter(builder, unmangleNames).process(root, 0); - return builder.toString(); + return StringUtils.stripEnd(builder.toString(), "\n"); } private static final class Formatter @@ -130,9 +130,6 @@ protected Void visitQuerySpecification(final QuerySpecification node, final Inte append(indent, "FROM "); processRelation(node.getFrom(), indent); - builder.append('\n'); - append(indent, " "); - builder.append('\n'); if (node.getWhere().isPresent()) { @@ -168,9 +165,18 @@ protected Void visitSelect(final Select node, final Integer indent) { builder.append(" DISTINCT"); } - if (node.getSelectItems().size() > 1) { + final List selectItems = node.getSelectItems() + .stream() + .map(item -> + (item instanceof SingleColumn) + ? ((SingleColumn) item).getAllColumns().map(SelectItem.class::cast).orElse(item) + : item) + .distinct() + .collect(Collectors.toList()); + + if (selectItems.size() > 1) { boolean first = true; - for (final SelectItem item : node.getSelectItems()) { + for (final SelectItem item : selectItems) { builder.append("\n") .append(indentString(indent)) .append(first ? " " : ", "); @@ -180,7 +186,7 @@ protected Void visitSelect(final Select node, final Integer indent) { } } else { builder.append(' '); - process(getOnlyElement(node.getSelectItems()), indent); + process(getOnlyElement(selectItems), indent); } builder.append('\n'); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java index 758710592a86..5218c833a6a5 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java @@ -549,19 +549,7 @@ protected Node visitNotExpression(final NotExpression node, final Object context } protected Node visitSingleColumn(final SingleColumn node, final Object context) { - // use an if/else block here (instead of isPresent.map(...).orElse(...)) so only one object - // gets instantiated (issue #1784) - if (node.getLocation().isPresent()) { - return new SingleColumn(node.getLocation().get(), - (Expression) process(node.getExpression(), context), - node.getAlias() - ); - } else { - return new SingleColumn( - (Expression) process(node.getExpression(), context), - node.getAlias() - ); - } + return node.copyWithExpression((Expression) process(node.getExpression(), context)); } protected Node visitAllColumns(final AllColumns node, final Object context) { diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java index f2b842616345..ef4d5af2f264 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java @@ -24,31 +24,47 @@ public class SingleColumn extends SelectItem { + private final Optional allColumns; private final Optional alias; private final Expression expression; public SingleColumn(final Expression expression) { - this(Optional.empty(), expression, Optional.empty()); + this(Optional.empty(), expression, Optional.empty(), Optional.empty()); } public SingleColumn(final Expression expression, final Optional alias) { - this(Optional.empty(), expression, alias); + this(Optional.empty(), expression, alias, Optional.empty()); } public SingleColumn(final Expression expression, final String alias) { - this(Optional.empty(), expression, Optional.of(alias)); + this(Optional.empty(), expression, Optional.of(alias), Optional.empty()); } public SingleColumn( final NodeLocation location, final Expression expression, final Optional alias) { - this(Optional.of(location), expression, alias); + this(Optional.of(location), expression, alias, Optional.empty()); } - private SingleColumn(final Optional location, final Expression expression, - final Optional alias) { + public SingleColumn( + final Expression expression, + final String alias, + final AllColumns allColumns) { + this(Optional.empty(), expression, Optional.of(alias), Optional.of(allColumns)); + } + + private SingleColumn(final SingleColumn other, final Expression expression) { + this(other.getLocation(), expression, other.alias, other.allColumns); + } + + private SingleColumn( + final Optional location, + final Expression expression, + final Optional alias, + final Optional allColumns) { super(location); requireNonNull(expression, "expression is null"); requireNonNull(alias, "alias is null"); + requireNonNull(allColumns, "allColumns is null"); alias.ifPresent(name -> { checkForReservedToken(expression, name, SchemaUtil.ROWTIME_NAME); @@ -57,6 +73,11 @@ private SingleColumn(final Optional location, final Expression exp this.expression = expression; this.alias = alias; + this.allColumns = allColumns; + } + + public SingleColumn copyWithExpression(final Expression expression) { + return new SingleColumn(this, expression); } private void checkForReservedToken( @@ -78,6 +99,15 @@ public Expression getExpression() { return expression; } + /** + * @return a reference to an {@code AllColumns} if this single column + * was expanded as part of a {@code SELECT *} Expression, otherwise + * returns an empty optional + */ + public Optional getAllColumns() { + return allColumns; + } + @Override public boolean equals(final Object obj) { if (this == obj) { @@ -87,22 +117,22 @@ public boolean equals(final Object obj) { return false; } final SingleColumn other = (SingleColumn) obj; - return Objects.equals(this.alias, other.alias) && Objects - .equals(this.expression, other.expression); + return Objects.equals(this.alias, other.alias) + && Objects.equals(this.expression, other.expression) + && Objects.equals(this.allColumns, other.allColumns); } @Override public int hashCode() { - return Objects.hash(alias, expression); + return Objects.hash(allColumns, alias, expression); } @Override public String toString() { - if (alias.isPresent()) { - return expression.toString() + " " + alias.get(); - } - - return expression.toString(); + return "SingleColumn{" + "allColumns=" + allColumns + + ", alias=" + alias + + ", expression=" + expression + + '}'; } @Override diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 2c2ada2f2fc0..c4cb524674b5 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -59,6 +59,7 @@ import io.confluent.ksql.parser.tree.QuerySpecification; import io.confluent.ksql.parser.tree.RegisterTopic; import io.confluent.ksql.parser.tree.SearchedCaseExpression; +import io.confluent.ksql.parser.tree.SelectItem; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; @@ -70,10 +71,14 @@ import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -1089,8 +1094,8 @@ public void shouldAddPrefixEvenIfColumnNameIsTheSameAsStream() { final Query query = ((CreateStreamAsSelect) statement).getQuery(); assertThat(query.getQueryBody(), instanceOf(QuerySpecification.class)); final QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); - assertThat(querySpecification.getSelect().getSelectItems().get(0).toString(), - equalTo("A.ADDRESS ADDRESS")); + assertThat(querySpecification.getSelect().getSelectItems().get(0), + equalToColumn("A.ADDRESS", "ADDRESS")); } @Test @@ -1104,8 +1109,8 @@ public void shouldNotAddPrefixIfStreamNameIsPrefix() { final Query query = ((CreateStreamAsSelect) statement).getQuery(); assertThat(query.getQueryBody(), instanceOf(QuerySpecification.class)); final QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); - assertThat(querySpecification.getSelect().getSelectItems().get(0).toString(), - equalTo("ADDRESS.ORDERID ORDERID")); + assertThat(querySpecification.getSelect().getSelectItems().get(0), + equalToColumn("ADDRESS.ORDERID", "ORDERID")); } @Test @@ -1118,8 +1123,8 @@ public void shouldPassIfStreamColumnNameWithAliasIsNotAmbiguous() { final Query query = ((CreateStreamAsSelect) statement).getQuery(); assertThat(query.getQueryBody(), instanceOf(QuerySpecification.class)); final QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); - assertThat(querySpecification.getSelect().getSelectItems().get(0).toString(), - equalTo("FETCH_FIELD_FROM_STRUCT(A.ADDRESS, 'CITY') ADDRESS__CITY")); + assertThat(querySpecification.getSelect().getSelectItems().get(0), + equalToColumn("FETCH_FIELD_FROM_STRUCT(A.ADDRESS, 'CITY')", "ADDRESS__CITY")); } @Test @@ -1132,8 +1137,12 @@ public void shouldPassIfStreamColumnNameIsNotAmbiguous() { final Query query = ((CreateStreamAsSelect) statement).getQuery(); assertThat(query.getQueryBody(), instanceOf(QuerySpecification.class)); final QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); - assertThat(querySpecification.getSelect().getSelectItems().get(0).toString(), - equalTo("FETCH_FIELD_FROM_STRUCT(ADDRESS.ADDRESS, 'CITY') ADDRESS__CITY")); + + final SelectItem item = querySpecification.getSelect().getSelectItems().get(0); + assertThat(item, equalToColumn( + "FETCH_FIELD_FROM_STRUCT(ADDRESS.ADDRESS, 'CITY')", + "ADDRESS__CITY" + )); } @Test(expected = KsqlException.class) @@ -1153,7 +1162,8 @@ public void shouldPassJoinQueryParseIfStreamColumnNameWithAliasIsNotAmbiguous() final Query query = ((CreateStreamAsSelect) statement).getQuery(); assertThat(query.getQueryBody(), instanceOf(QuerySpecification.class)); final QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); - assertThat(querySpecification.getSelect().getSelectItems().get(0).toString(), equalTo("ITEMID.ITEMID ITEMID_ITEMID")); + assertThat(querySpecification.getSelect().getSelectItems().get(0), + equalToColumn("ITEMID.ITEMID", "ITEMID_ITEMID")); } @Test @@ -1274,4 +1284,30 @@ private static SearchedCaseExpression getSearchedCaseExpressionFromCsas(final St return (SearchedCaseExpression) caseExpression; } + private static Matcher equalToColumn( + final String expression, + final String alias) { + return new TypeSafeMatcher() { + @Override + protected boolean matchesSafely(SelectItem item) { + if (!(item instanceof SingleColumn)) { + return false; + } + + SingleColumn column = (SingleColumn) item; + return Objects.equals(column.getExpression().toString(), expression) + && Objects.equals(column.getAlias().orElse(null), alias) + && Objects.equals(column.getAllColumns().isPresent(), false); + } + + @Override + public void describeTo(Description description) { + description.appendText( + String.format("Expression: %s, Alias: %s", + expression, + alias)); + } + }; + } + } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 23a56e1c5123..0fd7455cd616 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -257,8 +257,69 @@ public void shouldFormatSelectQueryCorrectly() { final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) .getStatement(); assertThat(SqlFormatter.formatSql(statement), equalTo("CREATE STREAM S AS SELECT FETCH_FIELD_FROM_STRUCT(A.ADDRESS, 'CITY') \"ADDRESS__CITY\"\n" - + "FROM ADDRESS A\n" - + " \n")); + + "FROM ADDRESS A")); + } + + @Test + public void shouldFormatSelectStarCorrectly() { + final String statementString = "CREATE STREAM S AS SELECT * FROM address;"; + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + assertThat(SqlFormatter.formatSql(statement), + equalTo("CREATE STREAM S AS SELECT *\n" + + "FROM ADDRESS ADDRESS")); + } + + @Test + public void shouldFormatSelectStarCorrectlyWithOtherFields() { + final String statementString = "CREATE STREAM S AS SELECT *, address AS city FROM address;"; + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + assertThat(SqlFormatter.formatSql(statement), + equalTo("CREATE STREAM S AS SELECT\n" + + " *\n" + + ", ADDRESS.ADDRESS \"CITY\"\n" + + "FROM ADDRESS ADDRESS")); + } + + @Test + public void shouldFormatSelectStarCorrectlyWithJoin() { + final String statementString = "CREATE STREAM S AS SELECT address.*, itemid.* " + + "FROM address INNER JOIN itemid ON address.address = itemid.address->address;"; + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + assertThat(SqlFormatter.formatSql(statement), + equalTo("CREATE STREAM S AS SELECT\n" + + " ADDRESS.*\n" + + ", ITEMID.*\n" + + "FROM ADDRESS ADDRESS\n" + + "INNER JOIN ITEMID ITEMID ON ((ADDRESS.ADDRESS = ITEMID.ADDRESS->ADDRESS))")); + } + + @Test + public void shouldFormatSelectStarCorrectlyWithJoinOneSidedStar() { + final String statementString = "CREATE STREAM S AS SELECT address.*, itemid.ordertime " + + "FROM address INNER JOIN itemid ON address.address = itemid.address->address;"; + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + assertThat(SqlFormatter.formatSql(statement), + equalTo("CREATE STREAM S AS SELECT\n" + + " ADDRESS.*\n" + + ", ITEMID.ORDERTIME \"ORDERTIME\"\n" + + "FROM ADDRESS ADDRESS\n" + + "INNER JOIN ITEMID ITEMID ON ((ADDRESS.ADDRESS = ITEMID.ADDRESS->ADDRESS))")); + } + + @Test + public void shouldFormatSelectCorrectlyWithDuplicateFields() { + final String statementString = "CREATE STREAM S AS SELECT address AS one, address AS two FROM address;"; + final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) + .getStatement(); + assertThat(SqlFormatter.formatSql(statement), + equalTo("CREATE STREAM S AS SELECT\n" + + " ADDRESS.ADDRESS \"ONE\"\n" + + ", ADDRESS.ADDRESS \"TWO\"\n" + + "FROM ADDRESS ADDRESS")); } }