-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: make UDAFs configurable and remove limit on COLLECT_LIST/SET #6851
Conversation
6201d8e
to
b35ea2b
Compare
ksqldb-common/src/main/java/io/confluent/ksql/function/AggregateFunctionInitArguments.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void configure(final Map<String, ?> map) { | ||
final Object limit = map.get(LIMIT_CONFIG); | ||
this.limit = (limit == null) ? this.limit : ((Number) limit).intValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are negative values handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I will treat negative as "no limit". I'll add that to the documentation
ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectSetUdaf.java
Show resolved
Hide resolved
@@ -67,7 +67,8 @@ | |||
final List<FunctionCall> aggregations, | |||
final Optional<WindowExpression> windowExpression, | |||
final FormatInfo valueFormat, | |||
final QueryContext.Stacker contextStacker | |||
final Stacker contextStacker, | |||
final KsqlConfig config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config
is not used here, right? I see SchemaKTable
is created with ksqlConfig
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I used it in SchemaKGroupedTable
because I didn't realize it had access to the field ksqlConfig
. I removed it
ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java
Show resolved
Hide resolved
82937ff
to
40cc03a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I just left one single comment this time.
@@ -509,6 +509,10 @@ String getName() { | |||
public static final Set<String> SSL_CONFIG_NAMES = sslConfigNames(); | |||
public static final Set<String> STREAM_TOPIC_CONFIG_NAMES = streamTopicConfigNames(); | |||
|
|||
public static KsqlConfig empty() { | |||
return new KsqlConfig(ImmutableMap.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened with the static EMPTY
? That could be reused it, isn't it? Perhaps have the variable back and just return it from this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was causing issues because of circular static dependencies. Some things depended on it in a static block and then things wouldn't load. It was very weird...
ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java
Show resolved
Hide resolved
40cc03a
to
93b73a3
Compare
Description
fixes #5738
fixes #6711
This commit adds the ability to configure UDAFs the same way that UDFs can be configured, by implementing the
Configurable
interface. By doing this, we can lift the restriction on the number of elements thatCOLLECT_LIST
andCOLLECT_SET
have without opening the flood gates in the confluent cloud offering (we can tune that number later).The "meat" of the PR is the on-liner in
UdafFactoryInvoker.java
and the rest is plumbing to make sure that we can get theKsqlConfig
down to that level.Testing done
Reviewer checklist