Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primitive Keys: allow key names other than ROWKEY #3536

Closed
big-andy-coates opened this issue Oct 11, 2019 · 0 comments · Fixed by #5093
Closed

Primitive Keys: allow key names other than ROWKEY #3536

big-andy-coates opened this issue Oct 11, 2019 · 0 comments · Fixed by #5093
Assignees
Labels
engine P0 Denotes must-have for a given milestone
Milestone

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Oct 11, 2019

Before we can add support for JSON/PB/Avro key formats we'll need to be able to support key columns with any name.

For example, let's say we have a PB schema with a single int field called 'id'. Users will need to be able to create a source from this with a schema with an ID INT KEY. It would be very unintuitive to require them to do ROWKEY INT KEY.

If they were to create a sink topic from this source, e.g. via CREATE STREAM OUTPUT AS SELECT * FROM INPUT;, then users will expect the output to be a PB message with a single field called ID, not ROWKEY.

Hence, it looks like we need to solve this issue before we can solve #4461

@big-andy-coates big-andy-coates self-assigned this Oct 11, 2019
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Feb 7, 2020
ksqlDB now supports the following primitive key types: `INT`, `BIGINT`, `DOUBLE` as well as the existing `STRING` type.

The key type can be defined in the CREATE TABLE or CREATE STREAM statement by including a column definition for `ROWKEY` in the form `ROWKEY <primitive-key-type> KEY,`, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT KEY, NAME STRING, RATING DOUBLE) WITH (kafka_topic='users', VALUE_FORMAT='json');
```

ksqlDB currently requires the name of the key column to be `ROWKEY`. Support for arbitrary key names is tracked by confluentinc#3536.

ksqlDB currently requires keys to use the `KAFKA` format. Support for additional formats is tracked by https://github.com/confluentinc/ksql/projects/3.

Schema inference currently only works with `STRING` keys, Support for additional key types is tracked by confluentinc#4462. (Schema inference is where ksqlDB infers the schema of a CREATE TABLE and CREATE STREAM statements from the schema registered in the Schema Registry, as opposed to the user supplying the set of columns in the statement).

Apache Kafka Connect can be configured to output keys in the `KAFKA` format by using a Converter, e.g. `"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"`. Details of which converter to use for which key type can be found here: https://docs.confluent.io/current/ksql/docs/developer-guide/serialization.html#kafka in the `Connect Converter` column.

@rmoff has written an introductory blog about primitive keys: https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/

BREAKING CHANGE: existing queries that perform a PARTITION BY or GROUP BY on a single column of one of the above supported primitive key types will now set the key to the appropriate type, not a `STRING` as previously.
big-andy-coates added a commit that referenced this issue Feb 7, 2020
* feat: primitive key support

ksqlDB now supports the following primitive key types: `INT`, `BIGINT`, `DOUBLE` as well as the existing `STRING` type.

The key type can be defined in the CREATE TABLE or CREATE STREAM statement by including a column definition for `ROWKEY` in the form `ROWKEY <primitive-key-type> KEY,`, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT KEY, NAME STRING, RATING DOUBLE) WITH (kafka_topic='users', VALUE_FORMAT='json');
```

ksqlDB currently requires the name of the key column to be `ROWKEY`. Support for arbitrary key names is tracked by #3536.

ksqlDB currently requires keys to use the `KAFKA` format. Support for additional formats is tracked by https://github.com/confluentinc/ksql/projects/3.

Schema inference currently only works with `STRING` keys, Support for additional key types is tracked by #4462. (Schema inference is where ksqlDB infers the schema of a CREATE TABLE and CREATE STREAM statements from the schema registered in the Schema Registry, as opposed to the user supplying the set of columns in the statement).

Apache Kafka Connect can be configured to output keys in the `KAFKA` format by using a Converter, e.g. `"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"`. Details of which converter to use for which key type can be found here: https://docs.confluent.io/current/ksql/docs/developer-guide/serialization.html#kafka in the `Connect Converter` column.

@rmoff has written an introductory blog about primitive keys: https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/

BREAKING CHANGE: existing queries that perform a PARTITION BY or GROUP BY on a single column of one of the above supported primitive key types will now set the key to the appropriate type, not a `STRING` as previously.
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 2, 2020
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 4, 2020
Partial fix for: confluentinc#3536

First part of supporting key column names other than `ROWKEY`.

With this initial pass you can now name your key columns anything you want in your `CREATE TABLE` and `CREATE STREAM` statements, e.g.

```sql
CREATE STREAM S (ID INT KEY, NAME STRING) WITH (...);
```

Any GROUP BY, PARTITION BY or JOIN on the key column results any created data source having a key column with a matching name, e.g.

```sql
-- schema of T: ID INT KEY, COUNT BIGINT
CREATE TABLE T AS SELECT COUNT() AS COUNT FROM S GROUP BY ID;
```

Pull and push queries work as expected and quoted identifiers work too.

However, this functionality is not complete yet.
Hence it is guarded by the `ksql.any.key.name.enabled` feature flag, which defaults to off.
The following big ticket items are remaining:

* PARTITION BY a single value column should result in a stream with the key column that matches the value column name.
* GROUP BY a single value column should result in a table with the key column that matches the value column name.
* JOIN on a single value column should  result in a stream/table with the key column that matches the value column name.

This additional work will be tracked under the same ticket, e.g. confluentinc#3536
@big-andy-coates big-andy-coates added this to the 0.8.0 milestone Mar 4, 2020
big-andy-coates added a commit that referenced this issue Mar 5, 2020
* chore: partial support for arbitrarily named key columns

Partial fix for: #3536

First part of supporting key column names other than `ROWKEY`.

With this initial pass you can now name your key columns anything you want in your `CREATE TABLE` and `CREATE STREAM` statements, e.g.

```sql
CREATE STREAM S (ID INT KEY, NAME STRING) WITH (...);
```

Any GROUP BY, PARTITION BY or JOIN on the key column results any created data source having a key column with a matching name, e.g.

```sql
-- schema of T: ID INT KEY, COUNT BIGINT
CREATE TABLE T AS SELECT COUNT() AS COUNT FROM S GROUP BY ID;
```

Pull and push queries work as expected and quoted identifiers work too.

However, this functionality is not complete yet.
Hence it is guarded by the `ksql.any.key.name.enabled` feature flag, which defaults to off.
The following big ticket items are remaining:

* PARTITION BY a single value column should result in a stream with the key column that matches the value column name.
* GROUP BY a single value column should result in a table with the key column that matches the value column name.
* JOIN on a single value column should  result in a stream/table with the key column that matches the value column name.

This additional work will be tracked under the same ticket, e.g. #3536
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 16, 2020
Prep for confluentinc#4749.

This commit changes the way the engine resolves '*' in a projection, e.g. `SELECT * FROM X;`.

Previously, the `Analyzer` was responsible for expanding the `*` into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (confluentinc#3536). The complexity comes about because the `Analyzer` would need to determine the presence of joins, group bys, partition bys, etc, which can effect how `*` is resolved.  This logic duplicates the logic in the `LogicalPlanner` and `PlanNode` sub-classes.

With this commit sees the logical plan and planner being responsible for resolving any `*` in the projection. This is achieved by asking the parent of the projection node to resolve the `*` into the set of columns. Parent node types that do not know how to resolve the `*`, e.g. `FilterNode`, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches a `DataSourceNode`, which can resolve the `*` into a list of columns. `JoinNode` knows how forward `*`, `left.*` and `right.*` appropriately.

Previously, the list of `SelectExpressions` was passed down from parent `PlanNode` to child, allowing some nodes to rewrite the expressions. For example, `FlatMapNode` would rewrite any expression involving a TableFunction to use the internal names like `KSQL_SYNTH_0`.

With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite.

At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.
@big-andy-coates big-andy-coates mentioned this issue Mar 16, 2020
2 tasks
big-andy-coates added a commit that referenced this issue Mar 18, 2020
* refactor: projection expression handling

Prep for #4749.

This commit changes the way the engine resolves '*' in a projection, e.g. `SELECT * FROM X;`.

Previously, the `Analyzer` was responsible for expanding the `*` into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (#3536). The complexity comes about because the `Analyzer` would need to determine the presence of joins, group bys, partition bys, etc, which can effect how `*` is resolved.  This logic duplicates the logic in the `LogicalPlanner` and `PlanNode` sub-classes.

With this commit sees the logical plan and planner being responsible for resolving any `*` in the projection. This is achieved by asking the parent of the projection node to resolve the `*` into the set of columns. Parent node types that do not know how to resolve the `*`, e.g. `FilterNode`, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches a `DataSourceNode`, which can resolve the `*` into a list of columns. `JoinNode` knows how forward `*`, `left.*` and `right.*` appropriately.

Previously, the list of `SelectExpressions` was passed down from parent `PlanNode` to child, allowing some nodes to rewrite the expressions. For example, `FlatMapNode` would rewrite any expression involving a TableFunction to use the internal names like `KSQL_SYNTH_0`.

With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite.

At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.

Co-authored-by: Andy Coates <[email protected]>
@vcrfxia vcrfxia modified the milestones: 0.8.0, 0.9.0 Mar 18, 2020
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 18, 2020
Fixes: confluentinc#4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](confluentinc#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](confluentinc#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
big-andy-coates added a commit that referenced this issue Mar 20, 2020
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.


Co-authored-by: Andy Coates <[email protected]>
big-andy-coates added a commit that referenced this issue Mar 23, 2020
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.

* fix: do not filter out rows where PARTITION BY resolves to null

Fixes: #4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.

* docs: call out limitation of partiiton by NULL
@big-andy-coates big-andy-coates added the P0 Denotes must-have for a given milestone label Mar 25, 2020
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 30, 2020
This commit removes any use of `ROWKEY` from the DataGen app. This is being done as part of the work to allow any key column names, (confluentinc#3536).

This is a compatible change as the key column name is not persisted in the data sent to Kafka. Hence the output doesn't change.
big-andy-coates added a commit that referenced this issue Apr 6, 2020
This commit removes any use of `ROWKEY` from the DataGen app. This is being done as part of the work to allow any key column names, (#3536).

This is a compatible change as the key column name is not persisted in the data sent to Kafka. Hence the output doesn't change.

Co-authored-by: Andy Coates <[email protected]>
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Apr 17, 2020
fixes: confluentinc#3536

Previously, stream `KEY` columns and table `PRIMARY KEY` columns had to be named `ROWKEY`. This change removes this restriction. Key columns can now have _any_ name.

For example, a table of users with the `BIGINT` user id stored in the Kafka record's key may have previously been defined as:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

With this change, this can now be defined with a more appropriate name for the table's primary key:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

To get around the `ROWKEY` column name requirement, previous versions of ksqlDB allowed an alias to the key to be defined in the WITH clause, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING, ID BIGINT)
   WITH('KEY'='ID', 'KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

The downside of this approach is that it required a _copy_ of the key column in the value of the Kafka record, which was often not present. This is no longer required. The above statement can now also be simplified to:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```
@big-andy-coates big-andy-coates mentioned this issue Apr 17, 2020
2 tasks
@big-andy-coates big-andy-coates modified the milestones: 0.9.0, 0.10.0 Apr 24, 2020
big-andy-coates added a commit that referenced this issue May 10, 2020
* feat: remove restriction on key column name

fixes: #3536

Previously, stream `KEY` columns and table `PRIMARY KEY` columns had to be named `ROWKEY`. This change removes this restriction. Key columns can now have _any_ name.

For example, a table of users with a `BIGINT` user id stored in the Kafka record's key may have previously been defined as:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

With this change, this can now be defined with a more appropriate name for the table's primary key:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

To get around the `ROWKEY` column name requirement, previous versions of ksqlDB allowed an alias to the key to be defined in the WITH clause, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING, ID BIGINT)
   WITH('KEY'='ID', 'KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

The downside of this approach is that it required a _copy_ of the key column in the value of the Kafka record, which was often not present. This is no longer required. The above statement can now also be simplified to:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

BREAKING CHANGE:

Statements containing PARTITION BY, GROUP BY or JOIN clauses will now produce different output schemas.

For PARTITION BY and GROUP BY statements, the name of the key column in the result is determined by the PARTITION BY or GROUP BY clause:

1. Where the partitioning or grouping is a single column reference, then the key column will have the same name as this column. For example:

```sql
CREATE STREAM OUTPUT AS 
   SELECT * FROM INPUT GROUP BY X;
-- OUTPUT will have a key column called X;
```

2. Where the partitioning or grouping is a single struct field, then the key column will have the same name as the field. For example:

```sql
CREATE STREAM OUTPUT AS 
   SELECT * FROM INPUT GROUP BY X->field1;
-- OUTPUT will have a key column called FIELD1;
```

3. Otherwise, the key column name will be system generated and be in the form `KSQL_COL_n`, where `n` is some positive integer.

In all case, except where grouping by more than one column, the new key column's name can be set by defining an alias in the projection. For example:

```sql
CREATE TABLE OUTPUT AS 
   SELECT USERID AS ID, COUNT(*) FROM USERS GROUP BY ID;
-- OUTPUT will have a key column named ID.
```

For groupings of multiple expressions there is currently no way to provide a name for the system generated key column. This is a shortcoming that will be fixed shortly when ksqlDB supports more than just a single key column.

For JOIN statements, the name of the key column in the result is determined by the join criteria.

1. For INNER and LEFT OUTER joins where the join criteria contains at least one column reference the key column will be named of the left most source whose join criteria is a column reference. For example:

```sql
CREATE TABLE OUTPUT AS 
    SELECT * FROM I1 JON I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
-- OUTPUT will have a key column named I2_ID.
```

The key column can be given a new name, if required, by defining an alias in the projection. For example:

```sql
CREATE TABLE OUTPUT AS
   SELECT I2.ID AS ID, I1.V0, I2.V0, I3.V0 FROM I1 JON I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
-- OUTPUT will have a key column named ID.
```

2. For FULL OUTER joins and other joins where the join criteria is not on column references, the key column in the output is not equivalent to any column from any source. The key column will have a system generated name in the form `KSQL_COL_n`, where `n` is a positive integer. For example:

```sql
CREATE TABLE OUTPUT AS
    SELECT * FROM I1 FULL OUTER JOIN I2 ON I1.ID = I2.ID;
-- OUTPUT will have a key column named KSQL_COL_0, or similar.
```

The key column can be given a new name, if required, by defining an alias in the projection. A new UDF has been introduced to help define the alias called `JOINKEY`. It takes the join criteria as its parameters. For example:

```sql
CREATE TABLE OUTPUT AS
    SELECT JOINKEY(I1.ID, I2.ID) AS ID, I1.V0, I2.V0 FROM  I1 FULL OUTER JOIN I2 ON I1.ID = I2.ID;
-- OUTPUT will have a key column named ID.
```

`JOINKEY` will be deprecated in a future release of ksqlDB once multiple key columns are supported.

Co-authored-by: Andy Coates <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
engine P0 Denotes must-have for a given milestone
Projects
None yet
2 participants