Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support window bound columns in selection of windowed group by #4450

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Feb 5, 2020

Description

Fixes bits of #4397

KSQL currently lets you take a non-windowed stream and perform a windowed group by:

CREATE TABLE T as SELECT stuff FROM S WINDOW TUMBLING (SIZE 1 SECOND) group by something;

Which is essentially grouping by not just something, but also implicitly by the window bounds.

This might be more correctly written with a Tumbling table function:

CREATE TABLE T as SELECT stuff FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;

Where the Tumbling table function returns one row for each row in S, with the addition of the windowstart and windowend columns. (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions).

In a correct SQL model windowstart and windowend would therefore be available as fields within the selection, e.g.

CREATE TABLE T as SELECT windowstart, windowend, something, count() FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;

This change makes such awesomeness possible.

How to review.

The changes, TBH, feel a little hacky. They're too spread around the code base. However, without some restructuring there's little choice but to hack the window bounds in. The planned structured keys work will improve things, and we probably need to rethink windowing at some point. That said, reviewing notes are:

  1. QueryAnalyser has been enhanced to also pass WHERE expressions to the aggregate analyser, so that it can throw on any window bounds columns.
  2. AggregateAnalyzer has been enhanced to throw if the window bound columns are used anywhere but in the select of windowed group by statements. (We'll need a Streams change to support anything else).
  3. Analysis.getFromSourceSchemas now takes a boolean flag to indicate if the returned schemas should, for windowed group bys, include the window bounds columns. Basically, the window bound columns are only available post the aggregation. A lot of the code passes true to get the schema with the window bounds in it, and relying on the fact that the AggregateAnalyzer has already rejected anything invalid.
  4. AggregateNode and LogicalPlanner have some magic to handle the special window bounds columns, where appropriate.
  5. AggregateParamsFactory now takes a flag to indicate a windowed group by, and adds appropriate window bounds columns to the result schema.
  6. StreamAggregateBuilder adds a new transformValues step to populate the window bounds values from the windowed key.

Note: there are some expected topology changes, but these will mostly be reverted by a follow on PR and are also backwards compatible. The topologies also include a dubiously named Aggregate-Aggregate-WindowSelect2 node. This will also go in the follow up PR.

Testing done

Suitable tests added for pull, push and persistent queries.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

KSQL currently lets you take a non-windowed stream and perform a windowed group by:

```sql
CREATE TABLE T as SELECT stuff FROM S WINDOW TUMBLING (SIZE 1 SECOND) group by something;
```

Which is essentially grouping by not just `something`, but also implicitly by the window bounds.

This might be more correctly written with a Tumbling table function:

```sql
CREATE TABLE T as SELECT stuff FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;
```

Where the Tumbling table function returns one row for each row in `S`, with the addition of the `windowstart` and `windowend` columns.   (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions).

In a correct SQL model `windowstart` and `windowend` would therefore be available as fields within the selection, e.g.

```
CREATE TABLE T as SELECT windowstart, windowend, something, count() FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;
```

This change makes such awesomeness possible.
@rodesai rodesai self-assigned this Feb 5, 2020
@big-andy-coates big-andy-coates merged commit 3c1114c into confluentinc:master Feb 6, 2020
@big-andy-coates big-andy-coates deleted the wnd_bounds_in_windowed_group_by branch February 6, 2020 16:10
@pkgonan
Copy link
Contributor

pkgonan commented Aug 2, 2021

@big-andy-coates @rodesai
Hi. This feature seems to be not work at confluent ksqldb 6.2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants