-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CTAS windows missing data when time already advanced to new window and inserting late-arriving records. #4579
Comments
Interesting as well is this behavior:
Insert some data for today: result shows up in running query insert some data for yesterday (= the previous window): I thought late-arriving records should be added to a window even if it's in the past. In my case here I am inserting at 17:00 into yesterday's window… |
figured out where the missing data is by setting DEBUG on org.apache.kafka.streams.kstream.internals:
This is related to https://issues.apache.org/jira/browse/KAFKA-8769 and cannot be controlled via ksql, because we cannot set KStreamWindowAggregate sets the observed time for the current partition to:
This means, inserting any older record for the same key does not end up in the window, if not This creates a bunch of problems for us, when entering a new window at 00:00 and records coming in for the past window in the same partition. This does not work:
The second record is discarded:
|
closing, because issue will probably be resolved with #4733 |
Describe the bug
CTAS with older data does not put data in the defined windows. Sorry, for renaming the issues a million times.
To Reproduce
Single node kafka/zk/… with 5.4.0, cp-ksql-server
Retention setting on broker 700 days
Make the retention for changelog topics really big and set auto offset reset:
Use the tutorial from https://kafka-tutorials.confluent.io/create-tumbling-windows/ksql.html#write-the-program-interactively-using-the-cli updated for 5.4.0
and fill the ratings topic with some data. I ended up with this (only relevant columns shown – just added more random data in the past):
Now create a windowed table
Actual behaviour
and select everything:
There is quite some data missing, e.g. everything with "Chris".
This is the same for 8 hour windows as well. The old records simply don't make it into the window, if the window is in the "yesterday" time frame.
Expected behavior
All old data should be there. Especially since the message timestamps in the ratings input topic are current and nothing is deleted on the broker.
Additional context
I managed to get debug logging on ksql-server. We often see
DEBUG Skipping record for expired segment. (org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore)
but the logging is not very deep, so we don't know which records were skipped, in the log.
Changelog topic is automatically set to
cleanup.policy | compact,delete
retention.ms | 30086400000
output table sink topic has broker default (700 days)
The text was updated successfully, but these errors were encountered: