Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: klip-36: grace period for stream-stream joins #6175

Merged
merged 2 commits into from
Sep 23, 2020

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Sep 10, 2020

@agavra agavra requested a review from a team as a code owner September 10, 2020 02:04
JOIN <stream> WITHIN (SIZE <time unit>, GRACE PERIOD <time unit>) ON <condition>
```

The old syntax will still be supported for backwards compatibility.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there plans to remove the old syntax eventually?

For my own education: my understanding was that GRACE PERIOD is optional in aggregations? Is this correct? Would it be optional for stream-stream joins, too?

For Kafka Streams, we had a discussions about making it a mandatory parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there plans to remove the old syntax eventually?

I think it would make sense to remove the old syntax for 1.0.

Would it be optional for stream-stream joins, too?

Yes (this is necessary to allow the old syntax as well)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, even if we make it required, we could still set 24h if the old syntax is used, when translating the old to the new internally?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With reference to recent discussions around supporting older versions of the syntax for longer... we'd likely want to support the old syntax for a while.


```md
When you join two streams, you must specify a WITHIN clause for matching
records that both occur within a specified time interval and optionally a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so it should be optionally?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to hear what other people think? Should GRACE PERIOD be optional? The current default of 24h is rather "random" and using a default of zero implies bad out-of-the-box behavior with regard to out-of-order data -- we would discard out-of-order records very aggressively as later records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it optional would follow the current pattern when you specify the grace period on a windowed aggregation. As to the default of 24h, I'm not sure what the right answer here is. For my specific use case of KSQL it's way too big, although for someone else it could be too small. Ideally I'd like a init parameter I could set to have better control over the default behavior. Then a 24h value as a default is probably a good starting point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it optional would follow the current pattern when you specify the grace period on a windowed aggregation.

Yes, but I am wondering if it should be mandatory for windowed aggregations, too? (Might be it's own KLIP thought.)

As to the default of 24h, I'm not sure what the right answer here is.

I guess there is no right answer. This is exactly my point: if there is no reasonable default, it seems to imply that there shouldn't be any default but it should be mandatory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think long term if should be required. However, we need to be backwards compatible, which means in the medium term it needs to be optional.

Personally, I would change the wording from and optionally `` to and it is recommended to supply a`.

In the docs I'd also detail what the default it and explain what the trade of is here, i.e. longer grace period means more storage, shorter grace period means less storage required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @big-andy-coates, considering backward compatibility. We should recommend setting the grace period, but not require it for now.

p.id as paymentId
FROM orders o
INNER JOIN payments p WITHIN 1 HOURS ON p.id = o.id
INNER JOIN shipments s WITHIN (SIZE 2 HOURS, GRACE PERIOD 30 MINUTES) ON s.id = o.id;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like some optimization potential? If one joins specifies a grace-period, we could auto-set it for the other join, too, saving storage but not losing any results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be valid to have different grace periods on different windows? Why would we want to auto-set it for the other window? I think you understand something I don't 😂

Copy link
Member

@mjsax mjsax Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you can of course allow this, but it won't buy the user anything (depending on the order of the joins). It seems the lower grace period of the second join limits what you get in your result anyway? If the first join allows to join two records that are 20h old, the result timestamp of the intermediate record would be 20h, too, and thus the second join would drop it.

There would be some details to figure out, ie, maybe think hard (or formally prove) that it's safe to reduce the grace period (but this might not necessarily be part of the KLIP I guess -- was just a thought and we might want to track it at least with a ticket).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured your reasoning would be something along those lines, but I wasn't sure that that hold for all window size/grace period combinations. I need to wrap my head around it a little more (I'll try to get back with a more concrete example), but I'm not convinced that that's always safe to drop - e.g. what would happen if the second window size was much larger than the first window size in your example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It's not totally straightforward when we can do what... And as I said, we don't need to make it part of the KLIP. But we should have a ticket "Optimize grace periods for chained stream-stream joins" (maybe with one or two illustrating examples) -- just to make sure we get to it at some point. We also don't need to implement it as part of the KLIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused here by terminology. The WITHIN clause applies to the non-triggering join argument/to the join argument on which it is specified? This means, in this example that we check the timestamp of the record of payments? And the GRACE clause applies as well to the join argument on which it is specified? In this example, shipments? So, to accept out-of-order records we just check the timestamp of shipment records?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grace period is base on "stream-time" progress. Ie, if an out-of-order record comes in, with ts < stream-time - grace-period the record is dropped. So records from either input may be dropped as late. Note that "stream-time" is advanced based on records of both input streams.

Does this answer your question?

Copy link
Member

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT! I just have one question about the semantics as I got confused but it shouldn't block the KLIP

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM :D

JOIN <stream> WITHIN (SIZE <time unit>, GRACE PERIOD <time unit>) ON <condition>
```

The old syntax will still be supported for backwards compatibility.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With reference to recent discussions around supporting older versions of the syntax for longer... we'd likely want to support the old syntax for a while.


```md
When you join two streams, you must specify a WITHIN clause for matching
records that both occur within a specified time interval and optionally a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think long term if should be required. However, we need to be backwards compatible, which means in the medium term it needs to be optional.

Personally, I would change the wording from and optionally `` to and it is recommended to supply a`.

In the docs I'd also detail what the default it and explain what the trade of is here, i.e. longer grace period means more storage, shorter grace period means less storage required.

Comment on lines +95 to +101
- We should consider whether we want to support the older syntax or require all new joins
to use the `(SIZE <size>, GRACE PERIOD <size>)` syntax (noting that `GRACE PERIOD` will
be optional).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably avoid breaking changes in the syntax until we have a mechanism for dealing with them and supporting multiple language versions. That said, I guess the KLIP can still detail what the final outcome should be, even if we delay implementing the breaking change for a bit.

## Test plan

We will add the usual QTT tests to ensure that the system respects the new retention limits
and manually test to ensure that we clean up the RocksDB state for expired windows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad there's no easy way to automate this. I was wondering if we could do something similar to KsMaterializationFunctionalTest which verifies retention for windowed aggregates (link) but we'd have to expose the state stores for that which feels like overkill for purposes of testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the saving grace here is that we can test that we properly set the grace period, and then trust that Kafka Streams tests their public APIs (we can't test everything in Kafka Streams - but that's OK)

@agavra agavra merged commit 3e7c531 into confluentinc:master Sep 23, 2020
@agavra agavra deleted the klip-36 branch September 23, 2020 19:59
@pavel-hp
Copy link

pavel-hp commented Nov 8, 2021

Have a question. Does this feature exists in Confluent (7.0.0) version of kSQL Db Server (image: confluentinc/cp-ksqldb-server:7.0.0)?

Cause I'm getting error

"You can use it as an identifier by escaping it as 'SIZE'"

as an example:
ksql> CREATE TABLE aggrByAccSubAccountSymbolWithPrice WITH (KAFKA_TOPIC='position-by-account-price', timestamp='timestamp', PARTITIONS=8, VALUE_FORMAT='JSON', KEY_FORMAT='JSON') AS SELECT LATEST_BY_OFFSET(f.amount) amount, ROUND(LATEST_BY_OFFSET(p.price), 2) price, ROUND(LATEST_BY_OFFSET(f.amount) * LATEST_BY_OFFSET(p.price), 2) marketValue, f.account account, f.subaccount subaccount, f.symbol symbol, LATEST_BY_OFFSET(accSubSymbol) accSubSymbol FROM aggrByAccSubSymbolKeySymbolStream f JOIN priceStream p WITHIN (SIZE 1000 MILLISECONDS, GRACE PERIOD 60000 MINUTES) ON f.symbol = p.symbol GROUP BY f.account, f.subAccount, f.symbol;

line 1:514: "SIZE" is a reserved keyword and it can't be used as an identifier. You can use it as an identifier by escaping it as 'SIZE' Statement: CREATE TABLE aggrByAccSubAccountSymbolWithPrice WITH (KAFKA_TOPIC='position-by-account-price', timestamp='timestamp', PARTITIONS=8, VALUE_FORMAT='JSON', KEY_FORMAT='JSON') AS SELECT LATEST_BY_OFFSET(f.amount) amount, ROUND(LATEST_BY_OFFSET(p.price), 2) price, ROUND(LATEST_BY_OFFSET(f.amount) * LATEST_BY_OFFSET(p.price), 2) marketValue, f.account account, f.subaccount subaccount, f.symbol symbol, LATEST_BY_OFFSET(accSubSymbol) accSubSymbol FROM aggrByAccSubSymbolKeySymbolStream f JOIN priceStream p WITHIN (SIZE 1000 MILLISECONDS, GRACE PERIOD 60000 MINUTES) ON f.symbol = p.symbol GROUP BY f.account, f.subAccount, f.symbol; Caused by: line 1:514: "SIZE" is a reserved keyword and it can't be used as an identifier. You can use it as an identifier by escaping it as 'SIZE'

@spena
Copy link
Member

spena commented Nov 8, 2021

It doesn't. It was reverted for some issues. The feature will be available in ksqlDB 0.22.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants