Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PARTITION BY semantics #4749

Closed
big-andy-coates opened this issue Mar 10, 2020 · 11 comments · Fixed by #4816
Closed

Fix PARTITION BY semantics #4749

big-andy-coates opened this issue Mar 10, 2020 · 11 comments · Fixed by #4816
Assignees

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Mar 10, 2020

Old style query semantics for partition by are:

S1: ROWKEY => B, C. (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: ROWKEY => B, C.

As you can see the schema of S2 is still the same. However, the old value of the key has been lost as its been overridden with the value from B.

With new primitive key work to remove the restriction on key column naming, the same query semantics do not work. e.g.

S1: A => B, C

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: B => B, C

This fails as the B value column clashes with the B key column! Hence we need to fix the query semantics to remove B from the value schema when doing the PARTITION BY.

It doesn't seem right to 'fix' the query semantics to remove B without also fixing them to add A to the value schema, i.e. the schema should be:

S2: B => A, C

@big-andy-coates
Copy link
Contributor Author

I would of liked to decouple fixing the query semantics from the primitive key work. However, it looks like some of the work will need to be done earlier than anticipated.

@apurvam
Copy link
Contributor

apurvam commented Mar 11, 2020

cc @derekjn @MichaelDrogalis

It seems reasonable to say that we need #3536 as a prerequisite to supporting JSON/PB/Avro key formats . But then we get the problem described above. And that means the scope becomes larger, as we get into changing query semantics, all the compatibility implications that brings, especially for hosted flavors of KSQL.

Would be good to brainstorm on how to thread this needle. Is it a reasonable step to drop the dependency on #3536 from the product point of view?

@MichaelDrogalis
Copy link
Contributor

MichaelDrogalis commented Mar 11, 2020

Derek and I connected offline to talk about it. We think it's smart to avoid accreting technical debt in this area since it’s incredibly fundamental to the project. If we need to enlarge the scope to do it right, we can get in the weeds and help.

@apurvam
Copy link
Contributor

apurvam commented Mar 11, 2020

Thanks. I think the main thing to decide upon is whether we think #3536 is a real blocker for supporting primitive keys serialized with PB/Avro/JSON. As @big-andy-coates noted in that ticket without allowing key names other than rowkey, a JSON serialized key like {'id': 45} will be used in KSQL as ROWKEY INT KEY. Particularly, you can't reference it with the identifier id as you might expect. You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

If the answer to that is 'no', then changing the key name necessitates changing the query semantics, which is going to be much more work.

@derekjn
Copy link
Contributor

derekjn commented Mar 12, 2020

You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

@mjsax
Copy link
Member

mjsax commented Mar 12, 2020

I would of liked to decouple fixing the query semantics from the primitive key work. However, it looks like some of the work will need to be done earlier than anticipated.

Not sure to what extend this affect query semantics? In the old model, there is ROWKEY that is stored in the key and <A,B> that is stored in the value, however, we know that either A or B is the same as ROWKEY (ie, the actual data schema is <A,B> and it does not change between S1 and S2).

Thus, both example use actually a different schema: <A,B> vs <A,B,C> and thus the example does not compare apples to apples from my understanding?

With the new semantics, ROWKEY is not a copy of a "data schema" column any longer, and thus just switching A and B from <A, <B,C>> to <B, <A,C>> does not sound a semantic change to me. The schema for S1 and S2 is <A,B,C>. (Thus, I don't think that generic keys are a requirement here as PARTITION BY a, b is not supported yet?)

In the old code the corresponding example would be ROWKEY in the key and <A,B,C> in the value with ROWKEY == A in S1 and ROWKEY == B in S2 while the "data schema" is always <A,B,C>.

Hence, I think we can just implement it without worrying about a semantic change? Let me know if this make sense of if I am totally off...

@apurvam
Copy link
Contributor

apurvam commented Mar 13, 2020

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

I think @big-andy-coates should take that one. But even if you could, I'm not sure how it addresses the usability gaps which have been mentioned.

@derekjn
Copy link
Contributor

derekjn commented Mar 14, 2020

But even if you could, I'm not sure how it addresses the usability gaps which have been mentioned.
It doesn't, I'm just trying to make sure I understand everything correctly.

@big-andy-coates
Copy link
Contributor Author

@mjsax

Not sure to what extend this affect query semantics? In the old model, there is ROWKEY that is stored in the key and <A,B> that is stored in the value, however, we know that either A or B is the same as ROWKEY (ie, the actual data schema is <A,B> and it does not change between S1 and S2).

It's not true that 'we know that either A or B is the same as ROWKEY'. ROWKEY could contain completely different data.

-- Schema: ROWKEY => A, B
-- Neither A nor B duplicate the data stored in ROWKEY.
CREATE STREAM FOO (ROWKEY INT  KEY, B INT, C INT) WITH (kafka_topic='x', value_format='json');

-- Schema: ROWKEY => A, B
-- The data stored in the old ROWKEY has been lost and overwritten by B.
CREATE STREAM BAR AS SELECT * FROM FOO PARTITION BY B;

I think you're misunderstanding the problem. You're assuming ROWKEY is duplicated in the value - that's not a requirement and never was for streams.

@mjsax
Copy link
Member

mjsax commented Mar 16, 2020

Thanks for clarifying.

Seems the old PARTITION BY that overwrite ROWKEY with the value of B and looses the original ROWKEY is already "broken". I agree that it should be fixed. Loosing a column if PARTITION BY is used sounds not right.

@big-andy-coates
Copy link
Contributor Author

@derekjn

You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

The WITH(Key='A', ..) functionality requires a copy of the key in the value. For many users this won't be the case, so they'd need to pre-process there data with something other than KSQL to get this.

So, IMHO, this is not a workable solution.

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 16, 2020
Prep for confluentinc#4749.

This commit changes the way the engine resolves '*' in a projection, e.g. `SELECT * FROM X;`.

Previously, the `Analyzer` was responsible for expanding the `*` into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (confluentinc#3536). The complexity comes about because the `Analyzer` would need to determine the presence of joins, group bys, partition bys, etc, which can effect how `*` is resolved.  This logic duplicates the logic in the `LogicalPlanner` and `PlanNode` sub-classes.

With this commit sees the logical plan and planner being responsible for resolving any `*` in the projection. This is achieved by asking the parent of the projection node to resolve the `*` into the set of columns. Parent node types that do not know how to resolve the `*`, e.g. `FilterNode`, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches a `DataSourceNode`, which can resolve the `*` into a list of columns. `JoinNode` knows how forward `*`, `left.*` and `right.*` appropriately.

Previously, the list of `SelectExpressions` was passed down from parent `PlanNode` to child, allowing some nodes to rewrite the expressions. For example, `FlatMapNode` would rewrite any expression involving a TableFunction to use the internal names like `KSQL_SYNTH_0`.

With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite.

At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.
@big-andy-coates big-andy-coates mentioned this issue Mar 16, 2020
2 tasks
big-andy-coates added a commit that referenced this issue Mar 18, 2020
* refactor: projection expression handling

Prep for #4749.

This commit changes the way the engine resolves '*' in a projection, e.g. `SELECT * FROM X;`.

Previously, the `Analyzer` was responsible for expanding the `*` into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (#3536). The complexity comes about because the `Analyzer` would need to determine the presence of joins, group bys, partition bys, etc, which can effect how `*` is resolved.  This logic duplicates the logic in the `LogicalPlanner` and `PlanNode` sub-classes.

With this commit sees the logical plan and planner being responsible for resolving any `*` in the projection. This is achieved by asking the parent of the projection node to resolve the `*` into the set of columns. Parent node types that do not know how to resolve the `*`, e.g. `FilterNode`, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches a `DataSourceNode`, which can resolve the `*` into a list of columns. `JoinNode` knows how forward `*`, `left.*` and `right.*` appropriately.

Previously, the list of `SelectExpressions` was passed down from parent `PlanNode` to child, allowing some nodes to rewrite the expressions. For example, `FlatMapNode` would rewrite any expression involving a TableFunction to use the internal names like `KSQL_SYNTH_0`.

With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite.

At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.

Co-authored-by: Andy Coates <[email protected]>
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 18, 2020
Fixes: confluentinc#4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](confluentinc#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](confluentinc#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
big-andy-coates added a commit that referenced this issue Mar 20, 2020
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.


Co-authored-by: Andy Coates <[email protected]>
big-andy-coates added a commit that referenced this issue Mar 23, 2020
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.

* fix: do not filter out rows where PARTITION BY resolves to null

Fixes: #4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.

* docs: call out limitation of partiiton by NULL
@big-andy-coates big-andy-coates self-assigned this Mar 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants