Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not filter out rows where PARTITION BY resolves to null #4823

Merged

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Mar 19, 2020

Description

Note: stacked on top of #4816, (which is now merged)

Fixes: #4747

This commit removes the filter that was excluding any rows where the PARTITION BY clause resolved to a null value, i.e. either because the result was null or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.

Testing done

QTT tests added.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

Fixes: confluentinc#4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](confluentinc#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](confluentinc#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
Fixes: confluentinc#4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.
@big-andy-coates big-andy-coates requested a review from a team as a code owner March 19, 2020 14:19
@big-andy-coates
Copy link
Contributor Author

FYI, the docs call out the limitations of partitioning by a null, and suggest using COALESCE from #4829 to avoid.

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with a couple of suggestions.

@agavra agavra mentioned this pull request Mar 19, 2020
2 tasks
big-andy-coates and others added 5 commits March 20, 2020 10:44
Conflicting files
ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParams.java
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java
ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java
ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: it's safe to change these as they are from an unreleased feature...

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why this is backwards compatible on a server restart - is the predicate somehow encoded in the query plan?

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Andy answered my question offline - old queries will go through the StreamSelectKeyBuidlerV1 which maintains the old behavior.

@big-andy-coates big-andy-coates merged commit e75a792 into confluentinc:master Mar 23, 2020
@big-andy-coates big-andy-coates deleted the partition_by_null branch March 23, 2020 16:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KSQL should not filter out stream rows with null key
3 participants