Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multi-column keys are broken in some scenarios when rearranged #7477

Merged
merged 4 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -35,6 +37,7 @@
import io.confluent.ksql.util.GrammaticalJoiner;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -192,10 +195,13 @@ static Stream<ColumnName> orderColumns(
final LogicalSchema schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To better understand the PR. What is columns and what is schema exactly? Is columns the list of selected columns from the SELECT clause (in the corresponding order) ? Is schema the "physical" layout of the input (what is schema for a join query?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly, this code is super weird to me; I was thinking the same thing, but erred against changing it. It's only ever called like this: orderColumns(getSchema().value(), getSchema()); so really, it could be done by just passing in one schema and then ensuring it's properly ordered.

) {
// When doing a `select *` key columns should be at the front of the column list
// but are added at the back during processing for performance reasons.
// but are added at the back during processing for performance reasons. Furthermore,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own education. What does this comment mean: but are added at the back during processing for performance reasons -- what is the perf impact and why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the context here is that it avoids shuffling data (adding at the end of an array is easier than shuffling the whole array)

// the keys should be selected in the same order as they appear in the source.
// Switch them around here:
final Stream<Column> keys = columns.stream()
.filter(c -> schema.isKeyColumn(c.name()));
final ImmutableMap<ColumnName, Column> columnsByName = Maps.uniqueIndex(columns, Column::name);
final Stream<Column> keys = schema.key().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change (for the fix) seems to be to use schema instead of columns to find the keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spot on, that's the first fix (the second fix is in the SelectionUtil

.map(key -> columnsByName.get(key.name()))
.filter(Objects::nonNull);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why we need this filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

um, we might not need this one here after digging into it more - but from the API of this method, there's nothing prevent columns from not covering everything in schema, which would mean columnsByName.get(key.name()) would return empty. Without refactoring this method signature I think it's better to be defensive here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading further below, I feel this logic need to be kept as-is even when we have #6374 in place with a clear separation (I'm assuming schema references the physical schema of the source, and columns references the logical schema mapped from the phyiscal schema by then).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be defensive here.

I guess that is the question? We know that it should never be null, so should not fail fast to expose a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't "know" that - I believe methods should not make assumptions about their callers. If you look only at this method there's no guarantee that every field in the schema has a corresponding schema that's passed in. If we wanted to prevent bugs, that would belong in the caller (checking that every column that's passed in has a corresponding schema entry)


final Stream<Column> windowBounds = columns.stream()
.filter(c -> SystemColumns.isWindowBound(c.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.planner.plan;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
Expand All @@ -27,11 +26,15 @@
import io.confluent.ksql.parser.tree.SelectItem;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.Column.Namespace;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -41,6 +44,39 @@ public final class SelectionUtil {
private SelectionUtil() {
}

/*
* The algorithm behind this method feels unnecessarily complicated and is begging
* for someone to come along and improve it, but until that time here is
* a description of what's going on.
*
* Essentially, we need to build a logical schema that mirrors the physical
* schema until https://github.com/confluentinc/ksql/issues/6374 is addressed.
* That means that the keys must be ordered in the same way as the parent schema
* (e.g. if the source schema was K1 INT KEY, K2 INT KEY and the projection is
* SELECT K2, K1 this method will produce an output schema that is K1, K2
* despite the way that the keys were ordered in the projection) - see
* https://github.com/confluentinc/ksql/pull/7477 for context on the bug.
*
* But we cannot simply select all the keys and then the values, we must maintain
* the interleaving of key and values because transient queries return all columns
* to the user as "value columns". If someone issues a SELECT VALUE, * FROM FOO
* it is expected that VALUE shows up _before_ the key fields. This means we need to
* reorder the key columns within the list of projections without affecting the
* relative order the keys/values.
*
* To spice things up even further, there's the possibility that the same key is
* aliased multiple times (SELECT K1 AS X, K2 AS Y FROM ...), which is not supported
* but is verified later when building the final projection - so we maintain it here.
*
* Now on to the algorithm itself: we make two passes through the list of projections.
* The first pass builds a mapping from source key to all the projections for that key.
* We will use this mapping to sort the keys in the second pass. This mapping is two
* dimensional to address the possibility of the same key with multiple aliases.
*
* The second pass goes through the list of projections again and builds the logical schema,
* but this time if we encounter a projection that references a key column, we instead take
* it from the list we built in the first pass (in order defined by the parent schema).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed comment! Super helpful.

public static LogicalSchema buildProjectionSchema(
final LogicalSchema parentSchema,
final List<SelectExpression> projection,
Expand All @@ -51,24 +87,46 @@ public static LogicalSchema buildProjectionSchema(
functionRegistry
);

final Builder builder = LogicalSchema.builder();

final ImmutableMap.Builder<ColumnName, SqlType> keys = ImmutableMap.builder();
// keyExpressions[i] represents the expressions found in projection
// that are associated with parentSchema's key at index i
final List<List<SelectExpression>> keyExpressions = new ArrayList<>(parentSchema.key().size());
for (int i = 0; i < parentSchema.key().size(); i++) {
keyExpressions.add(new ArrayList<>());
}

// first pass to construct keyExpressions, keyExpressionMembership
// is just a convenience data structure so that we don't have to do
// the isKey check in the second iteration below
final Set<SelectExpression> keyExpressionMembership = new HashSet<>();
for (final SelectExpression select : projection) {
final Expression expression = select.getExpression();
if (expression instanceof ColumnReferenceExp) {
final ColumnName name = ((ColumnReferenceExp) expression).getColumnName();
parentSchema.findColumn(name)
.filter(c -> c.namespace() == Namespace.KEY)
.ifPresent(c -> {
keyExpressions.get(c.index()).add(select);
keyExpressionMembership.add(select);
});
}
}

final SqlType expressionType = expressionTypeManager
.getExpressionSqlType(expression);

final boolean keyColumn = expression instanceof ColumnReferenceExp
&& parentSchema.isKeyColumn(((ColumnReferenceExp) expression).getColumnName());

if (keyColumn) {
builder.keyColumn(select.getAlias(), expressionType);
keys.put(select.getAlias(), expressionType);
// second pass, which iterates the projections but ignores any key expressions,
// instead taking them from the ordered keyExpressions list
final Builder builder = LogicalSchema.builder();
int currKeyIdx = 0;
for (final SelectExpression select : projection) {
if (keyExpressionMembership.contains(select)) {
while (keyExpressions.get(currKeyIdx).isEmpty()) {
currKeyIdx++;
}
final SelectExpression keyExp = keyExpressions.get(currKeyIdx).remove(0);
final SqlType type = expressionTypeManager.getExpressionSqlType(keyExp.getExpression());
builder.keyColumn(keyExp.getAlias(), type);
} else {
builder.valueColumn(select.getAlias(), expressionType);
final Expression expression = select.getExpression();
final SqlType type = expressionTypeManager.getExpressionSqlType(expression);
builder.valueColumn(select.getAlias(), type);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (NAME STRING KEY, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER",
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "JSON"
},
"valueFormat" : {
"format" : "JSON"
},
"keyFeatures" : [ "UNWRAP_SINGLES" ]
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nPARTITION BY INPUT.ID, INPUT.NAME\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` INTEGER KEY, `NAME` STRING KEY, `AGE` INTEGER",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "JSON"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "PartitionBy"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "JSON"
},
"valueFormat" : {
"format" : "JSON"
},
"keyFeatures" : [ "UNWRAP_SINGLES" ]
},
"sourceSchema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER"
},
"keyExpression" : [ "ID", "NAME" ]
},
"keyColumnNames" : [ "ID", "NAME" ],
"selectExpressions" : [ "AGE AS AGE" ]
},
"formats" : {
"keyFormat" : {
"format" : "JSON"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading