-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Generalize the UDAFs earliest_by_offset and latest_by_offset #8878
Conversation
@@ -86,7 +89,8 @@ public void shouldThrowExceptionForInvalidN() { | |||
@Test | |||
public void shouldComputeEarliestInteger() { | |||
// Given: | |||
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliestInteger(); | |||
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliest(); | |||
udaf.initializeTypeArguments(Collections.singletonList(SqlArgument.of(SqlTypes.INTEGER))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest question I have is if we want to keep these unit tests.
The previous methods like earliestInteger
have been removed. Since earliest_by_offset
is now configured with the input SQL type, this test is really quite silly looking. That said, with the modification, it does show that all the various details work.
Should these be left or removed? Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 to removing it - I don't think it adds much value, especially if we have the QTTs
I've put this up as a draft PR to ask a question about how the unit tests should be managed. Once that's settled, I'll sort out |
@@ -1000,7 +1000,7 @@ public void shouldThrowIfMissingAggregateTypeSchema() throws Exception { | |||
assertThat(e.getMessage(), containsString("Must specify 'aggregateSchema' for STRUCT parameter in @UdafFactory.")); | |||
} | |||
|
|||
@Test | |||
//@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, another thing to discuss... functions either need an annotated type or compute aggregate/return types.
I'll think about these more next week. Shout if you have an options about how to sort this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅 😆 😈 I know I promised I'd drop it, but if we went back to having a wrapper on the three methods you could check if calling that returns the default implementation or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test / behavior is changing since one can either have types given with an annotation or via the new methods.
If we had a wrapper, we'd still be in the same situation between annotations and the wrapper. 🤷
I'll see what I come up when I look again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've sorted this out by adding a little to the error to indicate choice in implementation.
ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/offset/KudafByOffsetUtils.java
Outdated
Show resolved
Hide resolved
@@ -1000,7 +1000,7 @@ public void shouldThrowIfMissingAggregateTypeSchema() throws Exception { | |||
assertThat(e.getMessage(), containsString("Must specify 'aggregateSchema' for STRUCT parameter in @UdafFactory.")); | |||
} | |||
|
|||
@Test | |||
//@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅 😆 😈 I know I promised I'd drop it, but if we went back to having a wrapper on the three methods you could check if calling that returns the default implementation or not
@@ -86,7 +89,8 @@ public void shouldThrowExceptionForInvalidN() { | |||
@Test | |||
public void shouldComputeEarliestInteger() { | |||
// Given: | |||
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliestInteger(); | |||
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliest(); | |||
udaf.initializeTypeArguments(Collections.singletonList(SqlArgument.of(SqlTypes.INTEGER))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 to removing it - I don't think it adds much value, especially if we have the QTTs
I nuked the LatestByOffsetTest and left a little bit of the EarliestByOffsetTest. Shout if I should go ahead and remove it completely. |
I still have the same issue on Confluent 7.1.2 on premises version, the LATEST_BY_OFFSET can not be used with Array type, is this a regression on Confluent 7.1.2? I also tried v7.1.1 and v7.0.0, any ideas how to use this fix on on premises installation? |
@Alter11 This change is not in 7.1.x, so it is not a regression. I believe that it should be in Confluent 7.2.0 which has just been released. Confluent Platform releases a new minor (or major) version quarterly. Since there's a rigorous testing process, branches are cut well in advance; that's why this isn't in CP 7.1.x. |
Addresses: #5437 and #8368
Description
The goal is to allow the UDAFs earliest_by_offset and latest_by_offset to operate on arbitrary input types.
Testing done
QTTs have been added to show the functions working generally.
Reviewer checklist