Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP 5.5 JOIN, GROUP BY and WINDOW do not work together #5898

Closed
hjafarpour opened this issue Jul 29, 2020 · 13 comments
Closed

CP 5.5 JOIN, GROUP BY and WINDOW do not work together #5898

hjafarpour opened this issue Jul 29, 2020 · 13 comments
Assignees
Milestone

Comments

@hjafarpour
Copy link
Contributor

Describe the bug
Seems that we have a regression in CP 5.5. The following query runs in CP 5.4 but I get the attached error when I use CP 5.5:

CREATE TABLE test AS SELECT 
   p.userid, 
   COUNT(*) as t_count 
FROM  PAGEVIEWS_ORIGINAL p 
LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid 
WINDOW TUMBLING (SIZE 1 MINUTE) 
GROUP BY p.userid 
HAVING COUNT(*) > 2;

To Reproduce
Run the above query in the Pageview demo context on both CP 5.4 and CP 5.5.1

Expected behavior
On CP 5.4 query runs as expected.

Actual behaviour
On CP 5.5.1 the following error message is shown:

ksql> CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count
>FROM  PAGEVIEWS_ORIGINAL p
>LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid
>WINDOW TUMBLING (SIZE 1 MINUTE)
>GROUP BY p.userid
>HAVING COUNT(*) > 2;
expected one element but was: <io.confluent.ksql.analyzer.Analysis$AliasedDataSource@2b97286d, io.confluent.ksql.analyzer.Analysis$AliasedDataSource@4d7cd0ac>
ksql> 

Additional context
Add any other context about the problem here.

@agavra
Copy link
Contributor

agavra commented Jul 29, 2020

Hmm, I can't repro on master. The following QTT passes:

    {
      "name": "count and having and join",
      "statements": [
        "CREATE STREAM pageviews (userID INT) WITH (kafka_topic='pageviews', value_format='JSON');",
        "CREATE TABLE users (userID INT PRIMARY KEY, col1 VARCHAR) WITH (kafka_topic='users', value_format='JSON');",
        "CREATE TABLE out AS SELECT p.userID, count(*) AS count FROM pageviews p LEFT JOIN users u ON p.userID = u.userID WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;"
      ],
      "inputs": [
        {"topic": "users", "key": 1, "value": {"col1": "one"}},
        {"topic": "pageviews", "value": {"userID": 1}},
        {"topic": "pageviews", "value": {"userID": 1}},
        {"topic": "pageviews", "value": {"userID": 1}}
      ],
      "outputs": [
        {"topic": "OUT", "key": 1, "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "OUT", "key": 1, "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "OUT", "key": 1, "value": {"COUNT": 3}, "window":  {"start": 0, "end": 60000, "type": "time"}}
      ]
    }

I will try on 5.5

@agavra
Copy link
Contributor

agavra commented Jul 29, 2020

hmm, can't repro on 5.5 either. The bug might be environmental - what other commands did you run?

@hjafarpour
Copy link
Contributor Author

Here are the statements I ran, the datagen was quickstart (pageview, users):

CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar)
WITH (kafka_topic='pageviews', value_format='json');

CREATE TABLE users_original(registertime bigint, userid varchar, regionid varchar, gender varchar)
 WITH (kafka_topic='users', value_format='json', key = 'userid');


CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count FROM  PAGEVIEWS_ORIGINAL p LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;

I use CP 5.5.1, both server and CLI.

@agavra agavra self-assigned this Jul 29, 2020
@agavra
Copy link
Contributor

agavra commented Jul 29, 2020

OK confirmed that I can reproduce this on 5.5.1 but not on master. I'll triage the impact and report back :)

    {
      "name": "count and having and join",
      "statements": [
        "CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='json');",
        "CREATE TABLE users_original(registertime bigint, userid varchar, regionid varchar, gender varchar) WITH (kafka_topic='users', value_format='json', key='userid');",
        "CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count FROM  PAGEVIEWS_ORIGINAL p LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;"
      ],
      "inputs": [
        {"topic": "users", "key": "1", "value": {"regionid": "one"}},
        {"topic": "pageviews", "value": {"userID": "1"}},
        {"topic": "pageviews", "value": {"userID": "1"}},
        {"topic": "pageviews", "value": {"userID": "1"}}
      ],
      "outputs": [
        {"topic": "TEST", "key": "1", "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "TEST", "key": "1", "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "TEST", "key": "1", "value": {"COUNT": 3}, "window":  {"start": 0, "end": 60000, "type": "time"}}
      ]
    }

@agavra agavra changed the title Regression between CP 5.4 and CP 5.5 CP 5.5 GROUP BY and WINDOW do not work together Jul 29, 2020
@agavra
Copy link
Contributor

agavra commented Jul 29, 2020

The bug happens here (QueryAnalyzer:L261):

    final AliasedDataSource source = Iterables.getOnlyElement(analysis.getFromDataSources());

It expects there to only be one source in the group by, but there may actually be multiple in the case of a join.

@agavra agavra changed the title CP 5.5 GROUP BY and WINDOW do not work together CP 5.5 JOIN, GROUP BY and WINDOW do not work together Jul 29, 2020
@agavra agavra removed their assignment Jul 29, 2020
@agavra
Copy link
Contributor

agavra commented Jul 30, 2020

Fixing this is non-trivial if we want to be able to also select WINDOWSTART and WINDOWEND. A simple fix accounts for being able to issue the query in the original issue, but the below test is more comprehensive:

    {
      "name": "aggregate windowed join",
      "statements": [
        "CREATE STREAM A (ROWKEY VARCHAR KEY, col1 VARCHAR) WITH (kafka_topic='a', value_format='JSON');",
        "CREATE TABLE B (ROWKEY VARCHAR KEY, col1 VARCHAR) WITH (kafka_topic='b', value_format='JSON');",
        "CREATE TABLE C AS SELECT A.ROWKEY, collect_list(A.COL1), collect_list(B.COL1), WINDOWSTART as WSTART, WINDOWEND AS WEND FROM A JOIN B on A.ROWKEY = B.ROWKEY WINDOW TUMBLING (SIZE 10 MILLISECONDS) GROUP BY a.ROWKEY;"
      ],
      "inputs": [
        {"topic": "b", "key": "1", "value": {"col1": "B1"}},
        {"topic": "a", "key": "1", "value": {"col1": "A1"}},
        {"topic": "a", "key": "1", "value": {"col1": "A2"}},
        {"topic": "b", "key": "1", "value": {"col1": "B2"}},
        {"topic": "a", "key": "1", "value": {"col1": "A3"}},
        {"topic": "a", "key": "1", "value": {"col1": "A4"}, "timestamp": 12}
      ],
      "outputs": [
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1"], "KSQL_COL_2": ["B1"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1", "A2"], "KSQL_COL_2": ["B1", "B1"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1", "A2", "A3"], "KSQL_COL_2": ["B1", "B1", "B2"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A4"], "KSQL_COL_2": ["B2"]}, "window": {"start": 10, "end": 20, "type": "time"}}
      ]
    }

Note: the output here isn't correct - the window bounds should be there but I was going to have them added in later

@agavra agavra added this to the 0.12.0 milestone Jul 30, 2020
@apurvam
Copy link
Contributor

apurvam commented Jul 30, 2020

@agavra just to confirm, but this is no longer an issue on master, and the ask is just to fix it on CP 5.5 / ksql 0.7?

@agavra
Copy link
Contributor

agavra commented Jul 30, 2020

I believe so, I haven't tried the full QTT test (including selecting window bounds in the select) but Hojjat's example works on master.

@big-andy-coates
Copy link
Contributor

Humm... let me take a look.

@big-andy-coates
Copy link
Contributor

The test case you added @agavra, where the window bounds are in the projection doesn't work on master, but also doesn't work on 5.5 or 5.4. So this is not a regression, just a bug. Breaking out into new bug: #5931.

The version without the window bounds in the projection does work in 5.4, 6.0 and master, and fails in 5.5 (as @agavra says above).

big-andy-coates added a commit that referenced this issue Aug 3, 2020
fixes: #5898

This change fixes a regression introduced in v5.5.0 that meant any windowed aggregation with a join would fail with an `IllegalArgumentException`.

This change fixes the regression, how ever follow on work is required to allow access to the window bounds columns `WINDOWSTART` and `WINDOWEND` in such queries.  Access to these columns was not possible in v5.4, i.e. this is not a regression. The follow on work will be tracked under #5931.
@big-andy-coates big-andy-coates modified the milestones: 0.12.0, 5.5.2 Aug 3, 2020
@big-andy-coates
Copy link
Contributor

Fixed by commit 9452ab6.

@agavra
Copy link
Contributor

agavra commented Aug 3, 2020

Reviewed 9452ab6 - LGTM

@big-andy-coates big-andy-coates modified the milestones: 5.5.2, 0.12.0 Aug 3, 2020
@big-andy-coates
Copy link
Contributor

Fixed in upcoming 5.5.2 and 6.0.0 releases and 0.12 community release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants