Skip to content

Commit

Permalink
refactor: introducing BaseComparableKudaf with a MIN/MAX Kudaf implem…
Browse files Browse the repository at this point in the history
…entations for all comparable classes (removing 12 type specific implementations)
  • Loading branch information
pgaref committed Mar 25, 2022
1 parent 8592e9c commit 32c1173
Show file tree
Hide file tree
Showing 31 changed files with 132 additions and 538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import java.util.function.Function;
import org.apache.kafka.streams.kstream.Merger;

public abstract class BaseComparableKudaf<T extends Comparable> extends
BaseAggregateFunction<T, T, T> {
public abstract class BaseComparableKudaf<T> extends BaseAggregateFunction<T, T, T> {

private final BiFunction<T, T, T> aggregatePrimitive;

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,15 @@ public KsqlAggregateFunction createAggregateFunction(
final SqlType argSchema = argTypeList.get(0).getSqlTypeOrThrow();
switch (argSchema.baseType()) {
case INTEGER:
return new IntegerMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case BIGINT:
return new LongMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DOUBLE:
return new DoubleMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DECIMAL:
return new DecimalMaxKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
case DATE:
return new DateMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case TIME:
return new TimeMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case TIMESTAMP:
return new TimestampMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
return new MaxComparableKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
default:
throw new KsqlException("No Max aggregate function with " + argTypeList.get(0) + " "
throw new KsqlException("No MAX aggregate function with " + argTypeList.get(0) + " "
+ "argument type exists!");

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
package io.confluent.ksql.function.udaf.max;

import io.confluent.ksql.function.udaf.BaseComparableKudaf;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.sql.Date;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.utils.FormatOptions;

public class DateMaxKudaf extends BaseComparableKudaf<Date> {
public class MaxComparableKudaf<T extends Comparable<? super T>> extends BaseComparableKudaf<T> {

DateMaxKudaf(
MaxComparableKudaf(
final String functionName,
final Integer argIndexInValue
final Integer argIndexInValue,
final SqlType outputSchema
) {
super(functionName,
argIndexInValue,
SqlTypes.DATE,
outputSchema,
(first, second) -> first.compareTo(second) > 0 ? first : second,
"Computes the maximum date value for a key.");
"Computes the maximum " + outputSchema.toString(FormatOptions.none())
+ " value for a key.");
}

}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 32c1173

Please sign in to comment.