-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable new emit-final implementation #9141
Conversation
@@ -80,7 +92,7 @@ public String toString() { | |||
+ "ADVANCE BY " + advanceBy | |||
+ retention.map(w -> " , RETENTION " + w).orElse("") | |||
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") | |||
+ " ) "; | |||
+ " ) " + emitStrategy.map(Enum::toString).orElse(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nasty hack -- did not find a better way to preserve the information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other readers: this "hack" is being added so that deserialization works properly here: #9141 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- some initial comments/questions inline.
@@ -73,12 +85,14 @@ public boolean equals(final Object o) { | |||
return false; | |||
} | |||
final WindowInfo that = (WindowInfo) o; | |||
// we omit `emitStrategy` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we omit this from the comparator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WindowInfo
should actually only describe the window itself and some code relies on it -- emit-strategy is not really a window property and it's just bolted on her as it's hard to get a hold on it otherwise.
Because emit-strategy
could be set on one instance but not on another, while both instances describe the same window, we omit it here to not break the comparison.
(IIRC)
ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java
Show resolved
Hide resolved
|
||
final KsqlWindowExpression windowExpression; | ||
|
||
if (extendedWindowExpression.endsWith("FINAL")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this hack, can we update the serializer/deserializer to serialize an additional, optional field to specify the emitStrategy? Ideally there'd be a way to deserialize a window object that is either represented as a raw string (legacy) or as an object (new).
@@ -62,7 +73,7 @@ public String toString() { | |||
return " SESSION ( " + gap | |||
+ retention.map(w -> " , RETENTION " + w).orElse("") | |||
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") | |||
+ " ) "; | |||
+ " ) " + emitStrategy.map(Enum::toString).orElse(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nasty hack -- did not find a better way to preserve the information.
@@ -65,7 +73,7 @@ public String toString() { | |||
return " TUMBLING ( SIZE " + size | |||
+ retention.map(w -> " , RETENTION " + w).orElse("") | |||
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("") | |||
+ " ) "; | |||
+ " ) " + emitStrategy.map(Enum::toString).orElse(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nasty hack -- did not find a better way to preserve the information.
|
||
final KsqlWindowExpression windowExpression; | ||
|
||
if (extendedWindowExpression.endsWith("FINAL")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nasty hack -- did not find a better way to preserve the information.
private static boolean isInternalStreamsConfig(final String key) { | ||
// add more internal configs on demand | ||
return key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX) | ||
|| key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need this one to run QTTs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only meant for QTT or are we expecting users to want/be able to set this for their own queries too? (Guessing the latter but want to check.)
Also, the first config (outer join spurious results fix emit interval) is unrelated to this PR, right? Is that another config we think users might want to set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are internal KS configs -- they control how often we try to emit results -- the other configs is for left/outer stream-stream joins (yes, it's unrelated to this PR, but serves the exact same purpose for a different operator).
We need them for QTT to ensure we alway try to emit result (both configs work on wall-clock time); otherwise, in QTT results might get "stuck" and are not emitted. I remember that for stream-stream join back in the days, we did not add test for all scenarios because we did not have access to this config.
Because both are internal, we don't really expect users to set them (with the exception that there is a perf problem and we tell them to change it). But we should be able for set the configs IMHO -- otherwise, the guard they provide for KS does not work for ksqlDB.
There are a few more internal configs, and we could consider to add them here, too. Also happy to remove the second config from this PR and we do some follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Just to confirm: users will be able to set these internal streams configs on their ksql queries (and have the configs take effect) after this change, right?
I think it's fine to leave the second config in -- we're essentially saying it's safe for users to set these configs on their ksql queries if they want to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, yes.
Of course, both configs are "internal" meaning it's not documented anywhere (on purpose) and user most likely don't know about them (and should not need to know about them in general).
throw new KsqlException("EMIT FINAL is only supported for windowed aggregations."); | ||
} | ||
if (refinementInfo.getOutputRefinement() != OutputRefinement.FINAL_PERSISTENT) { | ||
currentNode = buildSuppressNode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only add old suppress()
for old/historic plans. New FINAL_PERSISTENT
does not use suppress()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- LGTM at a high level. Some questions inline.
private static boolean isInternalStreamsConfig(final String key) { | ||
// add more internal configs on demand | ||
return key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX) | ||
|| key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only meant for QTT or are we expecting users to want/be able to set this for their own queries too? (Guessing the latter but want to check.)
Also, the first config (outer join spurious results fix emit interval) is unrelated to this PR, right? Is that another config we think users might want to set?
* | ||
* <p>For a table, this would mean output only the finalized result per-key. | ||
* | ||
* <p>For a stream, all events are final, so all are output. | ||
*/ | ||
FINAL; | ||
FINAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this old value still needed after this PR? It should be obsolete, right? Existing queries have suppress nodes in their plans, and new queries will only ever set FINAL_PERSISTENT
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 -- fair point. Could be that we don't need it...
Wondering if we should remove it? Or if we should re-purpose it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The distinction would only be internal, right? As in, users will never see FINAL
vs FINAL_PERSISTENT
, and we also don't write anything of the sort into the command topic. If so, I think either's fine since we'd always be able to change it in the future. My vote is to repurpose FINAL
if you feel confident that it's safe, else replace FINAL
with FINAL_PERSISTENT
since we know for sure that's safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, internal only, there is just EMIT FINAL
syntax for the user and if this is merged it should always use FINAL_PERSISTENT
internally.
My vote is to repurpose
Ack. Will look into it. Was not 100% sure if it would be save, but seems it should be. Better to cleanup right away.
@@ -73,12 +85,14 @@ public boolean equals(final Object o) { | |||
return false; | |||
} | |||
final WindowInfo that = (WindowInfo) o; | |||
// we omit `emitStrategy` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a "philosophical" choice (saying that conceptually a window info doesn't care about emit strategy), rather than a practical choice, right? As in, nothing breaks if we decide to include emitStrategy in the equals/hashCode methods, does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some stuff did break... I originally included it and only remove it later and added the comment (cannot remember from the top of my head what did break though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! I'd be curious to know what broke, but if it's difficult to find out again then don't worry about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could not reproduce it -- maybe something change in the mean time. Added it to both equals
and hashCode
.
.addEqualityGroup( | ||
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1020)), Optional.of(OutputRefinement.CHANGES)), | ||
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1020)), Optional.of(OutputRefinement.FINAL)), | ||
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1020)), Optional.of(OutputRefinement.FINAL_PERSISTENT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty optional should compare equal in this group too
// we only need to rewrite if we have a window expression and if we use emit final | ||
if (!windowExpression.isPresent() | ||
|| !refinementInfo.isPresent() | ||
|| !refinementInfo.get().isFinal()) { // can we test for isFinal? or memory vs rocks? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the concern here? Are you saying that users of in-memory stores wouldn't want to enable the new emit strategy? (Probably shouldn't affect ksql since ksql always uses rocks, just trying to understand.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We test for !isFinal()
that returns !true
for FINAL (in-memory) || FINAL_PERSISTENT (rocks)
?
The question is, if we need to test for only !FINAL_PERSISTENT
instead?
Given your other comment about FINAL
maybe being obsolete, the answer is "no". However, if we should return the unmodified window expression only for FINAL
(but not for FINAL_PERSISTENT
) the answer might be "yes".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I initially misunderstood what you meant by "memory vs rocks". I would hope that only new queries go through the RewrittenAnalysis so the two should be equivalent. It's been a while since I've worked with this part of the code, though.
@@ -3,7 +3,8 @@ | |||
{ | |||
"name": "should emit final result immediately at window end if grace is specified as zero", | |||
"properties": { | |||
"ksql.suppress.enabled": true | |||
"ksql.suppress.enabled": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need new historic plans for these existing QTTs with this change, right? The query plans should've changed to no longer have the suppress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. I did not yet create historical plan on purpose, as it "pollutes" the PR (and I need to wait for the session window KS PR anyway)
ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java
Show resolved
Hide resolved
@@ -73,20 +85,35 @@ public boolean equals(final Object o) { | |||
return false; | |||
} | |||
final WindowInfo that = (WindowInfo) o; | |||
|
|||
final Optional<OutputRefinement> thisEmit = emitStrategy.isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vcrfxia -- I hope this make sense? The idea is to make empty
and changes
equal (also cf hashCode
below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense but I have a code style question -- if empty is always the same as EMIT CHANGES, then can be change the constructor to store OutputRefinement
rather than Optional<OutputRefinement>
and use emitStrategy.orElse(OutputRefinement.CHANGES)
in the constructor, in order to simplify these methods here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider it, but was not sure -- as you bring it up, too, seems to be the better choice. Let me update it.
@@ -73,20 +85,35 @@ public boolean equals(final Object o) { | |||
return false; | |||
} | |||
final WindowInfo that = (WindowInfo) o; | |||
|
|||
final Optional<OutputRefinement> thisEmit = emitStrategy.isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense but I have a code style question -- if empty is always the same as EMIT CHANGES, then can be change the constructor to store OutputRefinement
rather than Optional<OutputRefinement>
and use emitStrategy.orElse(OutputRefinement.CHANGES)
in the constructor, in order to simplify these methods here?
@@ -193,6 +192,7 @@ public void testSimpleRightJoinFilterLogicalPlan() { | |||
assertThat(rightSource.getSources().get(0), instanceOf(PreJoinRepartitionNode.class)); | |||
} | |||
|
|||
@Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the latest commit adds this Ignore annotation to a number of unit tests. I'm guessing you have plans to address these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woohoo, thanks @mjsax ! LGTM with some non-blocking questions/comments inline.
@@ -64,6 +74,10 @@ public Optional<Duration> getSize() { | |||
return size; | |||
} | |||
|
|||
public OutputRefinement getEmitStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the serialize/deserialize to/from JSON loop work for this, now that this getter is updated to return OutputRefinement
while the constructor still accepts Optional<OutputRefinement>
? (Hopefully yes and I assume yes if QTTs are passing, just curious since I haven't encountered the situation before.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 -- Interesting question... To me it just did not make sense to return an Optional
if we know that it's always set... Did not think about JSON (de)serialization. But I agree, it seem to work so we should be good?
@@ -20,9 +20,6 @@ | |||
}, | |||
{ | |||
"name": "should not emit final result before window end if grace is specified as zero", | |||
"properties": { | |||
"ksql.suppress.enabled": true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come most of the tests have "ksql.streams.__emit.interval.ms.kstreams.windowed.aggregation__": 0
added, but this one and the one immediately below it don't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess it still passed because we expect no output (and thus I missed it), but I think we should set it to zero to not accidentally mask a bug.
Great catch!
@@ -328,10 +328,9 @@ CREATE OR REPLACE TABLE b AS SELECT id, COUNT(*) | |||
---------------------------------------------------------------------------------------------------- | |||
--@test: add filter to Suppress | |||
[email protected]: io.confluent.ksql.util.KsqlException | |||
[email protected]: Upgrades not yet supported for TableSuppress | |||
[email protected]: Upgrades not yet supported for StreamWindowedAggregate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine simply deleting this test rather than updating the expected error message. There's already another one which expects Upgrades not yet supported for StreamWindowedAggregate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe @agavra can chime in if this test still adds value or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we can probably remove this test - the idea was that if you had a suppress it would fail regardless of what was being aggregated, but I don't think it adds much value since suppress only makes sense with windowed aggregates anyway (which we already test for above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up PR to remove this test: #9308
No description provided.