Skip to content

Commit

Permalink
feat: Support MIN/MAX udafs for Time/TS/Date types (#8924)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgaref authored Mar 30, 2022
1 parent 247e6a5 commit 6399d30
Show file tree
Hide file tree
Showing 46 changed files with 4,227 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public abstract class AggregateFunctionFactory {
.add(ImmutableList.of(ParamTypes.DECIMAL))
.build();

protected static final ImmutableList<List<ParamType>> NUMERICAL_TIME = ImmutableList
.<List<ParamType>>builder().addAll(NUMERICAL_ARGS)
.add(ImmutableList.of(ParamTypes.DATE))
.add(ImmutableList.of(ParamTypes.TIME))
.add(ImmutableList.of(ParamTypes.TIMESTAMP))
.build();

public AggregateFunctionFactory(final String functionName) {
this(new UdfMetadata(
functionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import java.util.function.Function;
import org.apache.kafka.streams.kstream.Merger;

public abstract class BaseNumberKudaf<T extends Number> extends BaseAggregateFunction<T, T, T> {
public abstract class BaseComparableKudaf<T extends Comparable<? super T>> extends
BaseAggregateFunction<T, T, T> {

private final BiFunction<T, T, T> aggregatePrimitive;

public BaseNumberKudaf(
public BaseComparableKudaf(
final String functionName,
final Integer argIndexInValue,
final SqlType outputSchema,
Expand All @@ -44,7 +45,7 @@ public BaseNumberKudaf(
outputSchema,
Collections.singletonList(
new ParameterInfo(
"number",
"value",
SchemaConverters.sqlToFunctionConverter().toFunctionType(outputSchema),
"the value to aggregate",
false)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,23 @@ public KsqlAggregateFunction createAggregateFunction(
final SqlType argSchema = argTypeList.get(0).getSqlTypeOrThrow();
switch (argSchema.baseType()) {
case INTEGER:
return new IntegerMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case BIGINT:
return new LongMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DOUBLE:
return new DoubleMaxKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DECIMAL:
return new DecimalMaxKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
case DATE:
case TIME:
case TIMESTAMP:
return new MaxKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
default:
throw new KsqlException("No Max aggregate function with " + argTypeList.get(0) + " "
+ " argument type exists!");
throw new KsqlException("No MAX aggregate function with " + argTypeList.get(0) + " "
+ "argument type exists!");

}
}

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "NUMERICAL_ARGS is ImmutableList")
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "NUMERICAL_TIME is ImmutableList")
public List<List<ParamType>> supportedArgs() {
return NUMERICAL_ARGS;
return NUMERICAL_TIME;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
Expand All @@ -15,22 +15,23 @@

package io.confluent.ksql.function.udaf.max;

import io.confluent.ksql.function.udaf.BaseNumberKudaf;
import io.confluent.ksql.function.udaf.BaseComparableKudaf;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.math.BigDecimal;
import io.confluent.ksql.schema.utils.FormatOptions;

public class DecimalMaxKudaf extends BaseNumberKudaf<BigDecimal> {
public class MaxKudaf<T extends Comparable<? super T>> extends BaseComparableKudaf<T> {

DecimalMaxKudaf(
MaxKudaf(
final String functionName,
final Integer argIndexInValue,
final SqlType returnSchema
final SqlType outputSchema
) {
super(functionName,
argIndexInValue,
returnSchema,
BigDecimal::max,
"Computes the maximum decimal value for a key.");
argIndexInValue,
outputSchema,
(first, second) -> first.compareTo(second) > 0 ? first : second,
"Computes the maximum " + outputSchema.toString(FormatOptions.none())
+ " value for a key.");
}

}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,23 @@ public KsqlAggregateFunction createAggregateFunction(
final SqlType argSchema = argTypeList.get(0).getSqlTypeOrThrow();
switch (argSchema.baseType()) {
case INTEGER:
return new IntegerMinKudaf(FUNCTION_NAME, initArgs.udafIndex());
case BIGINT:
return new LongMinKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DOUBLE:
return new DoubleMinKudaf(FUNCTION_NAME, initArgs.udafIndex());
case DECIMAL:
return new DecimalMinKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
case DATE:
case TIME:
case TIMESTAMP:
return new MinKudaf(FUNCTION_NAME, initArgs.udafIndex(), argSchema);
default:
throw new KsqlException("No Max aggregate function with " + argTypeList.get(0) + " "
+ " argument type exists!");
throw new KsqlException("No MIN aggregate function with " + argTypeList.get(0) + " "
+ "argument type exists!");

}
}

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "NUMERICAL_ARGS is ImmutableList")
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "NUMERICAL_TIME is ImmutableList")
public List<List<ParamType>> supportedArgs() {
return NUMERICAL_ARGS;
return NUMERICAL_TIME;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
Expand All @@ -15,22 +15,23 @@

package io.confluent.ksql.function.udaf.min;

import io.confluent.ksql.function.udaf.BaseNumberKudaf;
import io.confluent.ksql.function.udaf.BaseComparableKudaf;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.math.BigDecimal;
import io.confluent.ksql.schema.utils.FormatOptions;

public class DecimalMinKudaf extends BaseNumberKudaf<BigDecimal> {
public class MinKudaf<T extends Comparable<? super T>> extends BaseComparableKudaf<T> {

DecimalMinKudaf(
MinKudaf(
final String functionName,
final Integer argIndexInValue,
final SqlType returnSchema
final SqlType outputSchema
) {
super(functionName,
argIndexInValue,
returnSchema,
BigDecimal::min,
"Computes the minimum decimal value for a key.");
argIndexInValue,
outputSchema,
(first, second) -> first.compareTo(second) < 0 ? first : second,
"Computes the minimum " + outputSchema.toString(FormatOptions.none())
+ " value for a key.");
}

}
Loading

0 comments on commit 6399d30

Please sign in to comment.