Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid duplicate column name errors from auto-generated aliases #4827

Merged
merged 10 commits into from
Mar 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@

package io.confluent.ksql.name;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public final class ColumnNames {

Expand All @@ -41,11 +50,24 @@ public static ColumnName aggregateColumn(final int idx) {
}

/**
* Where the user hasn't specified an alias for an expression in a SELECT we generate them using
* this method. This value is exposed to the user in the output schema
* Create a generator that will build column aliases in the form {@code KSQL_COL_x}.
*
* <p>Names are guaranteed not to clash with any existing columns in the {@code sourceSchemas}.
*
* <p>Used where the user hasn't specified an alias for an expression in a SELECT. This generated
* column names are exposed to the user in the output schema.
*
* @param sourceSchemas the stream of source schemas.
* @return a generator of unique column names.
*/
public static ColumnName generatedColumnAlias(final int idx) {
return ColumnName.of(GENERATED_ALIAS_PREFIX + idx);
public static Supplier<ColumnName> columnAliasGenerator(
final Stream<LogicalSchema> sourceSchemas
) {
final Set<Integer> used = generatedAliasIndexes(sourceSchemas)
.boxed()
.collect(Collectors.toSet());

return new AliasGenerator(0, used)::next;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we pass 1 here we should get names more consistent with what we had before (start at offset 1), and a smaller test diff.

Copy link
Contributor Author

@big-andy-coates big-andy-coates Mar 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that too and tried it. However, the old code started at zero too. If we switch this to 1 we just get a different set of tests to change.

 private int selectItemIndex = 0;

The changes are because the name is no longer controlled by the index of the select expresssion.

}

/**
Expand All @@ -71,41 +93,52 @@ public static boolean isAggregate(final ColumnName name) {
return name.text().startsWith(AGGREGATE_COLUMN_PREFIX);
}

/**
* Determines the next unique column alias.
*
* <p>Finds any existing {@code KSQL_COL_x} column names in the supplied {@code sourceSchema} to
* ensure the returned generated column name is unique.
*
* @param sourceSchema the source schema.
* @return a column name in the form {@code KSQL_COL_x} which does not clash with source schema.
*/
public static ColumnName nextGeneratedColumnAlias(final LogicalSchema sourceSchema) {
final int maxExistingIdx = maxGeneratedAliasIndex(sourceSchema);
return generatedColumnAlias(maxExistingIdx + 1);
private static OptionalInt extractGeneratedAliasIndex(final ColumnName columnName) {
final Matcher matcher = GENERATED_ALIAS_PATTERN.matcher(columnName.text());
return matcher.matches()
? OptionalInt.of(Integer.parseInt(matcher.group(1)))
: OptionalInt.empty();
}

/**
* Determines the highest index of generated column names like {@code KSQL_COL_x} in the supplied
* {@code sourceSchema}.
*
* @param sourceSchema the schema.
* @return the highest index or {@code -1}
*/
private static int maxGeneratedAliasIndex(final LogicalSchema sourceSchema) {
return sourceSchema.columns().stream()
private static IntStream generatedAliasIndexes(final Stream<LogicalSchema> sourceSchema) {
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
return sourceSchema
.map(LogicalSchema::columns)
.flatMap(List::stream)
.map(Column::name)
.map(ColumnNames::extractGeneratedAliasIndex)
.filter(OptionalInt::isPresent)
.mapToInt(OptionalInt::getAsInt)
.max()
.orElse(-1);
.mapToInt(OptionalInt::getAsInt);
}

private static OptionalInt extractGeneratedAliasIndex(final ColumnName columnName) {
final Matcher matcher = GENERATED_ALIAS_PATTERN.matcher(columnName.text());
return matcher.matches()
? OptionalInt.of(Integer.parseInt(matcher.group(1)))
: OptionalInt.empty();
@VisibleForTesting
static final class AliasGenerator {

private final Set<Integer> used;
private int next;

AliasGenerator(final int initial, final Set<Integer> used) {
this.used = ImmutableSet.copyOf(used);
this.next = initial;
}

ColumnName next() {
return ColumnName.of(GENERATED_ALIAS_PREFIX + nextIndex());
}

private int nextIndex() {
int idx;

do {
idx = next++;

if (idx < 0) {
throw new KsqlException("Wow, you've managed to use up all possible generated aliases. "
+ "Impressive! Please provide explicit aliases to some of your columns");
}

} while (used.contains(idx));

return idx;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,75 @@
package io.confluent.ksql.name;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.name.ColumnNames.AliasGenerator;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Test;

public class ColumnNamesTest {

@Test
public void shouldGenerateUniqueAliasesStartingAtZero() {
public void shouldStartGeneratingFromZeroIfSourceSchemasHaveNoGeneratedAliases() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.build();
final Supplier<ColumnName> generator = ColumnNames
.columnAliasGenerator(Stream.of(LogicalSchema.builder().build()));

// When:
final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
final ColumnName result = generator.get();

// Then:
assertThat(result, is(ColumnName.of("KSQL_COL_0")));
}

@Test
public void shouldGenerateUniqueAliasesTakingAnyKeyColumnsIntoAccount() {
public void shouldAvoidClashesWithSourceColumnNames() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(ColumnName.of("Fred"), SqlTypes.STRING)
.keyColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING)
.keyColumn(ColumnName.of("George"), SqlTypes.STRING)
.build();
final Supplier<ColumnName> generator = ColumnNames
.columnAliasGenerator(Stream.of(LogicalSchema.builder()
.keyColumn(ColumnName.of("Fred"), SqlTypes.STRING)
.keyColumn(ColumnName.of("KSQL_COL_3"), SqlTypes.STRING)
.keyColumn(ColumnName.of("KSQL_COL_1"), SqlTypes.STRING)
.valueColumn(ColumnName.of("KSQL_COL_1"), SqlTypes.STRING)
.valueColumn(ColumnName.of("George"), SqlTypes.STRING)
.valueColumn(ColumnName.of("KSQL_COL_5"), SqlTypes.STRING)
.build()
));

// When:
final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
final List<ColumnName> result = IntStream.range(0, 5)
.mapToObj(idx -> generator.get())
.collect(Collectors.toList());

// Then:
assertThat(result, is(ColumnName.of("KSQL_COL_2")));
assertThat(result, contains(
ColumnName.of("KSQL_COL_0"),
ColumnName.of("KSQL_COL_2"),
ColumnName.of("KSQL_COL_4"),
ColumnName.of("KSQL_COL_6"),
ColumnName.of("KSQL_COL_7")
));
}

@Test
public void shouldGenerateUniqueAliasesTakingAnyValueColumnsIntoAccount() {
public void shouldThrowIfIndexOverflows() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("Fred"), SqlTypes.STRING)
.valueColumn(ColumnNames.generatedColumnAlias(1), SqlTypes.STRING)
.valueColumn(ColumnName.of("George"), SqlTypes.STRING)
.build();
final AliasGenerator generator =
new AliasGenerator(Integer.MAX_VALUE, ImmutableSet.of());

// When:
final ColumnName result = ColumnNames.nextGeneratedColumnAlias(schema);
generator.next(); // returns MAX_VALUE.

// Then:
assertThat(result, is(ColumnName.of("KSQL_COL_2")));
// When:
assertThrows(KsqlException.class, generator::next);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.engine.rewrite;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.expression.tree.DereferenceExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -38,9 +40,9 @@
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
* Validate and clean ASTs generated from externally supplied statements
Expand Down Expand Up @@ -69,13 +71,16 @@ private static final class RewriterPlugin extends
final MetaStore metaStore;
final DataSourceExtractor dataSourceExtractor;

private int selectItemIndex = 0;
private Supplier<ColumnName> aliasGenerator;

RewriterPlugin(final MetaStore metaStore, final DataSourceExtractor dataSourceExtractor) {
super(Optional.empty());
this.metaStore = Objects.requireNonNull(metaStore, "metaStore");
this.dataSourceExtractor
= Objects.requireNonNull(dataSourceExtractor, "dataSourceExtractor");
this.metaStore = requireNonNull(metaStore, "metaStore");
this.dataSourceExtractor = requireNonNull(dataSourceExtractor, "dataSourceExtractor");
this.aliasGenerator = ColumnNames.columnAliasGenerator(
dataSourceExtractor.getAllSources().stream()
.map(DataSource::getSchema)
);
}

@Override
Expand Down Expand Up @@ -128,16 +133,16 @@ protected Optional<AstNode> visitSingleColumn(
final StatementRewriter.Context<Void> ctx
) {
if (singleColumn.getAlias().isPresent()) {
selectItemIndex++;
return Optional.empty();
}

final ColumnName alias;
final Expression expression = ctx.process(singleColumn.getExpression());
if (expression instanceof QualifiedColumnReferenceExp) {
final SourceName source = ((QualifiedColumnReferenceExp) expression).getQualifier();
final ColumnName name = ((QualifiedColumnReferenceExp) expression).getColumnName();
if (dataSourceExtractor.isJoin()
&& dataSourceExtractor.getCommonFieldNames().contains(name)) {
&& dataSourceExtractor.getCommonColumnNames().contains(name)) {
alias = ColumnNames.generatedJoinColumnAlias(source, name);
} else {
alias = name;
Expand All @@ -152,9 +157,9 @@ protected Optional<AstNode> visitSingleColumn(
dereferenceExpressionString.substring(
dereferenceExpressionString.indexOf(KsqlConstants.DOT) + 1)));
} else {
alias = ColumnNames.generatedColumnAlias(selectItemIndex);
alias = aliasGenerator.get();
}
selectItemIndex++;

return Optional.of(
new SingleColumn(singleColumn.getLocation(), expression, Optional.of(alias))
);
Expand Down Expand Up @@ -188,9 +193,8 @@ private static final class ExpressionRewriterPlugin extends
final MetaStore metaStore,
final DataSourceExtractor dataSourceExtractor) {
super(Optional.empty());
this.metaStore = Objects.requireNonNull(metaStore, "metaStore");
this.dataSourceExtractor
= Objects.requireNonNull(dataSourceExtractor, "dataSourceExtractor");
this.metaStore = requireNonNull(metaStore, "metaStore");
this.dataSourceExtractor = requireNonNull(dataSourceExtractor, "dataSourceExtractor");
}

@Override
Expand Down
Loading