Skip to content

Commit

Permalink
feat: Generalize the UDAFs earliest_by_offset and latest_by_offset
Browse files Browse the repository at this point in the history
Addresses: #5437 and #8368
  • Loading branch information
jnh5y committed Mar 11, 2022
1 parent 2dae2a1 commit d3cd92a
Show file tree
Hide file tree
Showing 21 changed files with 2,529 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ class UdafFactoryInvoker implements FunctionSignature {
private static final Logger LOG = LoggerFactory.getLogger(UdafFactoryInvoker.class);

private final FunctionName functionName;
private final ParamType aggregateArgType;
private final ParamType aggregateReturnType;
private final Optional<Metrics> metrics;
private final List<ParamType> paramTypes;
private final List<ParameterInfo> params;
private final Method method;
private final String description;
private final UdafTypes types;
private final String aggregateSchema;
private final String outputSchema;
private ParamType aggregateReturnType;

UdafFactoryInvoker(
final Method method,
Expand All @@ -70,10 +72,12 @@ class UdafFactoryInvoker implements FunctionSignature {
if (!Modifier.isStatic(method.getModifiers())) {
throw new KsqlException("UDAF factory methods must be static " + method);
}
final UdafTypes types = new UdafTypes(method, functionName, typeParser);
this.types = new UdafTypes(method, functionName, typeParser);
this.functionName = Objects.requireNonNull(functionName);
this.aggregateArgType = Objects.requireNonNull(types.getAggregateSchema(aggregateSchema));
this.aggregateReturnType = Objects.requireNonNull(types.getOutputSchema(outputSchema));
this.aggregateSchema = aggregateSchema;
this.outputSchema = outputSchema;
//this.aggregateArgType = Objects.requireNonNull(types.getAggregateSchema(aggregateSchema));
//this.aggregateReturnType = Objects.requireNonNull(types.getOutputSchema(outputSchema));
this.metrics = Objects.requireNonNull(metrics);
this.params = types.getInputSchema(Objects.requireNonNull(inputSchema));
this.paramTypes = params.stream().map(ParameterInfo::type).collect(Collectors.toList());
Expand All @@ -95,10 +99,14 @@ KsqlAggregateFunction createFunction(final AggregateFunctionInitArguments initAr
}

final SqlType aggregateSqlType = (SqlType) udaf.getAggregateSqlType()
.orElseGet(() -> SchemaConverters.functionToSqlConverter().toSqlType(aggregateArgType));
.orElseGet(() -> SchemaConverters.functionToSqlConverter()
.toSqlType(types.getAggregateSchema(aggregateSchema)));
final SqlType returnSqlType = (SqlType) udaf.getReturnSqlType()
.orElseGet(() ->
SchemaConverters.functionToSqlConverter().toSqlType(aggregateReturnType));
SchemaConverters.functionToSqlConverter()
.toSqlType(types.getOutputSchema(outputSchema)));
this.aggregateReturnType =
SchemaConverters.sqlToFunctionConverter().toFunctionType(returnSqlType);

final KsqlAggregateFunction function;
if (TableUdaf.class.isAssignableFrom(method.getReturnType())) {
Expand Down
Loading

0 comments on commit d3cd92a

Please sign in to comment.