Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix repartition semantics #4816

Merged

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Mar 18, 2020

Description

Fixes: #4749. (Note: the feature is currently disabled behind the 'allow any key column name' feature flag).

Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same. However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a SELECT * .. PARTITION BY needed fixing.

Secondly, with new primitive key work to remove the restriction on key column naming, the same query semantics do not work. e.g.

S1: A => B, C

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: B => B, C

This fails as the B value column clashes with the B key column!

The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);

Results in the schema: S2: KSQL_COL_0 => B, C, A.

This github issue will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.

The approach

There are main changes:

  1. The LogicalPlanner has been updated to build the new repartitioned schema via a new PartitionByParamsFactory class.

  2. The streams topology is built differently by introducing a new version of the SelectKey plan step. The data passed is the same as the old version. However, the new version is handled by a new builder, which knows to build the streams topology in the right way.

It would also have been possible to achieve the second step by adding a defaulted flag to the existing SelectKey step. However, it was felt that clear separation was better. This means once we go version 1.0 we can just delete the old V1 step, rather than trying to unpick the code that handled a boolean flag.

Testing done

Usual.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

Fixes: confluentinc#4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](confluentinc#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](confluentinc#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanations! LGTM

@big-andy-coates big-andy-coates merged commit 609e9e2 into confluentinc:master Mar 20, 2020
@big-andy-coates big-andy-coates deleted the paratition_by_semantics branch March 20, 2020 15:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix PARTITION BY semantics
2 participants