Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multi-column keys are broken in some scenarios when rearranged #7477

Merged
merged 4 commits into from
May 11, 2021

Conversation

agavra
Copy link
Contributor

@agavra agavra commented May 5, 2021

Description

There are a few situations where key columns are re-arranged (differ from the source/partition by). You can reproduce like this:

CREATE STREAM inputStream (id INT KEY, a INT, b INT, c INT) WITH (kafka_topic='input', partitions='4');

INSERT INTO input VALUES (1, 11, 21, 31);
INSERT INTO input VALUES (2, 12, 22, 32);
INSERT INTO input VALUES (1, 11, 21, 31);

CRATE STRAM repartitionedStream AS SELECT * FROM inputStream PARTITION BY c+5, b;
SELECT * FROM repartitionedStream EMIT CHANGES;

+-----------------+-----------------+-----------------+-----------------+-----------------+
|B                |KSQL_COL_0       |A                |C                |ID               |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|36               |21               |11               |31               |1                |
|37               |22               |12               |32               |2                |
|36               |21               |11               |31               |1                |

This is caused because of a mismatch in the source schema and projection schema. This PR fixes this issue in two places:

  1. in PARTITION BY, a SELECT * is resolved using the source schema and doesn't take into account the ordering of the keys selected in the PARTITION BY clause
  2. in any SELECT (projection) clause where the keys are not ordered the same as the source, this PR just reorders it for the user so that it matches the source schema

Testing done

QTT tests

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team as a code owner May 5, 2021 23:39
@agavra agavra requested review from mjsax and vcrfxia May 5, 2021 23:39
// Switch them around here:
final Stream<Column> keys = columns.stream()
.filter(c -> schema.isKeyColumn(c.name()));
// but are added at the back during processing for performance reasons. Furthermore,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own education. What does this comment mean: but are added at the back during processing for performance reasons -- what is the perf impact and why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the context here is that it avoids shuffling data (adding at the end of an array is easier than shuffling the whole array)

@@ -192,10 +195,13 @@ static void throwKeysNotIncludedError(
final LogicalSchema schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To better understand the PR. What is columns and what is schema exactly? Is columns the list of selected columns from the SELECT clause (in the corresponding order) ? Is schema the "physical" layout of the input (what is schema for a join query?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly, this code is super weird to me; I was thinking the same thing, but erred against changing it. It's only ever called like this: orderColumns(getSchema().value(), getSchema()); so really, it could be done by just passing in one schema and then ensuring it's properly ordered.

// they should be selected in the order of the key, if that is not the same as the
// ordering within the value. Switch them around here:
final ImmutableMap<ColumnName, Column> columnsByName = Maps.uniqueIndex(columns, Column::name);
final Stream<Column> keys = schema.key().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change (for the fix) seems to be to use schema instead of columns to find the keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spot on, that's the first fix (the second fix is in the SelectionUtil

final ImmutableMap<ColumnName, Column> columnsByName = Maps.uniqueIndex(columns, Column::name);
final Stream<Column> keys = schema.key().stream()
.map(key -> columnsByName.get(key.name()))
.filter(Objects::nonNull);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why we need this filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

um, we might not need this one here after digging into it more - but from the API of this method, there's nothing prevent columns from not covering everything in schema, which would mean columnsByName.get(key.name()) would return empty. Without refactoring this method signature I think it's better to be defensive here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading further below, I feel this logic need to be kept as-is even when we have #6374 in place with a clear separation (I'm assuming schema references the physical schema of the source, and columns references the logical schema mapped from the phyiscal schema by then).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be defensive here.

I guess that is the question? We know that it should never be null, so should not fail fast to expose a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't "know" that - I believe methods should not make assumptions about their callers. If you look only at this method there's no guarantee that every field in the schema has a corresponding schema that's passed in. If we wanted to prevent bugs, that would belong in the caller (checking that every column that's passed in has a corresponding schema entry)

for (final Pair<ColumnName, SqlType> aliasAndType : aliasAndTypes) {
if (aliasAndType != null) {
// can be null if the key was not selected - this will cause a failure
// down the line but here we just ignore it here
Copy link
Member

@mjsax mjsax May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can it cause an error? And how would it surface to the user? -- Not sure what we don't want to catch this error right here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're required to select all keys, if you don't it'll complain :) I don't think we should add unnecessary redundant checks in places they don't belong. there's no reason why this specific code needs to know that you're required to select all keys. The code that requires that should enforce it.

"name": "multiple columns - select star - reorder columns",
"statements": [
"CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');",
"CREATE STREAM OUTPUT AS select * from INPUT partition by AGE, ID;"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta question: The lower/upper case style is confusing to me.

Should we not upper case all keywords, and lower case all identifiers? This is all a wild mix making it hard to read.

CREATE STREAM input (name STRING, id INT, age INT) with (kafka_topic='input', format='JSON');
CREATE STREAM output AS SELECT * FROM input PARTITION BY age, id;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replicate this test (or can you point out existing tests) for:

  • input stream with existing multi-column key plus reordering: PARTITION BY key2,key1
  • PARTITION BY key, nonkey
  • PARTITION BY nonKey, key
  • all of the above replicated SELECT columnNames (instead of *) with all 4 reordering combination (either in SELECT or PARTITION BY); seems two are already covered
  • mixing key and values in select: SELECT key2, nonKey2, key1, nonKey1...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "name": "multiple key columns - reordered", already exists
  • "name": "multiple columns - some key some value" already exists
  • I'll add that
  • multiple key columns tests this
  • the one I mentioned above tests this as well

As far as the casing style I just copied existing tests - happy to change the new ones but I'm not going to go and change the old ones for this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'm not going to go and change the old ones for this PR

Sure :) (did not expect that)

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra and @mjsax for fixing (and finding) this bug! Are we planning to get this into 0.18 as well?

* The second pass goes through the list of projections again and builds the logical schema,
* but this time if we encounter a projection that references a key column, we instead take
* it from the list we built in the first pass (in order defined by the parent schema).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed comment! Super helpful.

final SelectExpression keyExp = keyExpressions.get(currKeyIdx).remove(0);
final SqlType type = expressionTypeManager.getExpressionSqlType(keyExp.getExpression());
builder.keyColumn(keyExp.getAlias(), type);
if (keyExpressions.get(currKeyIdx).isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a key isn't selected? (Should this be a while-loop rather than a simple if-check? If so, we also need to update currKeyIdx = 0; on line 117.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - made this a while loop and moved it up (before the remove) so no need to change line 117

{"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, ID INT KEY, AGE INT"}
]
}
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test coverage! This is great. Do we also need to audit the test coverage for GROUP BY?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should, though since GROUP BY doesn't use the same code we should do that separately (Matthias tested it locally and GROUP BYs have this "rearrange" behavior)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new algorithm looks good to me :) I think for #6374 itself we probably need to rely on this two-phase procedure anyways to separate logical columns from phyiscal schema, along with the maintained mapping between the two.

final ImmutableMap<ColumnName, Column> columnsByName = Maps.uniqueIndex(columns, Column::name);
final Stream<Column> keys = schema.key().stream()
.map(key -> columnsByName.get(key.name()))
.filter(Objects::nonNull);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading further below, I feel this logic need to be kept as-is even when we have #6374 in place with a clear separation (I'm assuming schema references the physical schema of the source, and columns references the logical schema mapped from the phyiscal schema by then).

pom.xml Outdated
@@ -136,7 +136,7 @@
<scala.version>2.13.2</scala.version>
<apache.io.version>2.6</apache.io.version>
<io.confluent.ksql.version>6.2.0-0</io.confluent.ksql.version>
<io.confluent.schema-registry.version>${confluent.version.range}</io.confluent.schema-registry.version>
<io.confluent.schema-registry.version>6.2.0-685</io.confluent.schema-registry.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops! I didn't mean to check that in... but without this things take way too long locally...

@agavra agavra merged commit 453ca8b into confluentinc:6.2.x May 11, 2021
@agavra agavra deleted the 6.2.x branch May 11, 2021 17:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants