Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid output null values when HAVING clauses not met #3558

Closed
big-andy-coates opened this issue Oct 11, 2019 · 9 comments · Fixed by #6405
Closed

Avoid output null values when HAVING clauses not met #3558

big-andy-coates opened this issue Oct 11, 2019 · 9 comments · Fixed by #6405
Assignees
Labels
P1 Slightly lower priority to P0 ;) requires-streams-change
Milestone

Comments

@big-andy-coates
Copy link
Contributor

e.g. given this test case:

{
      "name": "with having expression (stream->table)",
      "statements": [
        "CREATE STREAM TEST (f1 INT) WITH (kafka_topic='test_topic', KEY='f1', value_format='DELIMITED');",
        "CREATE TABLE OUTPUT AS SELECT f1, COUNT(*) FROM TEST GROUP BY f1 HAVING SUM(f1) > 1;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": "1", "value": "1"},
        {"topic": "test_topic", "key": "2", "value": "2"},
        {"topic": "test_topic", "key": "1", "value": "1"},
        {"topic": "test_topic", "key": "2", "value": "2"},
        {"topic": "test_topic", "key": "3", "value": "3"}
      ],
      "outputs": [
        {"topic": "OUTPUT", "key": "1", "value": null},
        {"topic": "OUTPUT", "key": "2", "value": "2,1"},
        {"topic": "OUTPUT", "key": "1", "value": "1,2"},
        {"topic": "OUTPUT", "key": "2", "value": "2,2"},
        {"topic": "OUTPUT", "key": "3", "value": "3,1"}
      ]
    }

Notice the null value output for the first input row. This is because the sum of the aggregate after the first row isn't high enough to pass the HAVING criteria.

The issue here is that the HAVING criteria is implemented using a KTable::filter call, which is stateless, and hence does not know there hasn't been a previous value that did pass the filter, and so it must emit a tombstone to clear any previous output row.

The fix is actually a fix in KS, which is to turn on KTableImpl:: enableSendingOldValues for any filter that is downstream of a statestore. This will mean the filter receives a CDC event, (old, new), rather than just (new). Meaning it can be more intelligent and only send the tombstone if the change transitions from passing the filter for the old value to not passing for the new.

@vector1983
Copy link

@big-andy-coates Any work around for this? I keep see the ERROR log in KSQL:
ERROR Invalid format: 10.121.8.11 : null (io.confluent.ksql.structured.SqlPredicate)

@big-andy-coates
Copy link
Contributor Author

@big-andy-coates Any work around for this? I keep see the ERROR log in KSQL:
ERROR Invalid format: 10.121.8.11 : null (io.confluent.ksql.structured.SqlPredicate)

Hi @vector1983, I'm not sure that's the same issue!

There's no work around at the moment that I'm aware of.

@Deninc
Copy link

Deninc commented May 12, 2020

+1 to fix this
It's very confusing, because when we test print in KSQL-CLI it shows only non-null results. But in the actual topics it includes all the nulls. It should be at least explicit and consistent.

@vvcephei
Copy link
Member

Hey @big-andy-coates , do you mind creating an AK Jira ticket to track this:

The fix is actually a fix in KS, which is to turn on KTableImpl:: enableSendingOldValues for any filter that is downstream of a statestore.

Thanks!

@big-andy-coates
Copy link
Contributor Author

Added https://issues.apache.org/jira/browse/KAFKA-10077 to track KS work.

@vvcephei
Copy link
Member

vvcephei commented Jun 1, 2020

Thanks!

@big-andy-coates
Copy link
Contributor Author

This will also affect users with WHERE clauses, (not just HAVING clauses), e.g.

CREATE TABLE KT_THERAPISTS AS SELECT
    ID AS THERAPIST_ID
FROM KT_USERS
WHERE ID = 100073;

@stevenpyzhang
Copy link
Member

Adding needs-triage label to discuss if we should include in either 0.12.0/0.13.0

@big-andy-coates
Copy link
Contributor Author

Upstream Kafka Streams work for this is now complete: apache/kafka#9156

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Oct 12, 2020
big-andy-coates added a commit that referenced this issue Oct 12, 2020
* feat: avoid spurious tombstones in table output

fixes: #3558

AK commit apache/kafka#9156 enhances Kafka Streams so that filters on tables now avoid emitting spurious tombstones. ksqlDB now benefits from this.  Tombstones are no longer emitted to the sink topic when a HAVING clause excludes a row from the result _that has never been in the result table_.

BREAKING CHANGE: This change fixes a _bug_ where unnecessary tombstones where being emitted when a `HAVING` clause filtered out a row from the source that is not in the output table

For example, given:

```sql
-- source stream:
CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...);

-- aggregate into a table:
CREATE TABLE BAR AS
    SELECT ID, SUM(VAL) AS SUM
    FROM FOO
    GROUP BY ID
    HAVING SUM(VAL) > 0;


-- insert some values into the stream:
INSERT INTO FOO VALUES(1, -5); 
INSERT INTO FOO VALUES(1, 6); 
INSERT INTO FOO VALUES(1, -2); 
INSERT INTO FOO VALUES(1, -1); 
```

Where previously the contents of the sink topic `BAR` would have contained records:

| Key | Value | Notes |
|-----|-------|------|
| 1.     | null.   | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. |
| 1.     | {sum=1} | Row added as HAVING criteria now met |
| 1.     | null.   | Row deleted as HAVING criteria now not met |
| 1.     | null.   | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. |

Note: the first record in the tom

The topic will now contain:

| Key | Value |
|-----|-------|
| 1.     | {sum=1} |
| 1.     | null.   |

Co-authored-by: Andy Coates <[email protected]>
@big-andy-coates big-andy-coates added this to the 0.14.0 milestone Oct 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P1 Slightly lower priority to P0 ;) requires-streams-change
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants