From d3cd92a0e2a7a54505e2e7b44f7a4ea1b6947683 Mon Sep 17 00:00:00 2001 From: Jim Hughes Date: Thu, 10 Mar 2022 20:40:05 -0500 Subject: [PATCH 1/5] feat: Generalize the UDAFs earliest_by_offset and latest_by_offset Addresses: https://github.com/confluentinc/ksql/issues/5437 and https://github.com/confluentinc/ksql/issues/8368 --- .../ksql/function/UdafFactoryInvoker.java | 22 +- .../udaf/offset/EarliestByOffset.java | 322 ++++-------------- .../udaf/offset/KudafByOffsetUtils.java | 9 + .../udaf/offset/EarliestByOffsetTest.java | 119 +++++-- .../7.2.0_1644852483241/plan.json | 229 +++++++++++++ .../7.2.0_1644852483241/spec.json | 241 +++++++++++++ .../7.2.0_1644852483241/topology | 25 ++ .../7.2.0_1644852482932/plan.json | 229 +++++++++++++ .../7.2.0_1644852482932/spec.json | 224 ++++++++++++ .../7.2.0_1644852482932/topology | 25 ++ .../7.2.0_1646962668589/plan.json | 234 +++++++++++++ .../7.2.0_1646962668589/spec.json | 145 ++++++++ .../7.2.0_1646962668589/topology | 25 ++ .../7.2.0_1644852483075/plan.json | 229 +++++++++++++ .../7.2.0_1644852483075/spec.json | 185 ++++++++++ .../7.2.0_1644852483075/topology | 25 ++ .../7.2.0_1644852483152/plan.json | 229 +++++++++++++ .../7.2.0_1644852483152/spec.json | 193 +++++++++++ .../7.2.0_1644852483152/topology | 25 ++ .../earliest-offset-udaf.json | 91 +++++ .../query-validation-tests/udaf.json | 11 - 21 files changed, 2529 insertions(+), 308 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_an_array_added/7.2.0_1644852483241/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_an_array_added/7.2.0_1644852483241/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_an_array_added/7.2.0_1644852483241/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_arrays,_structs,_and_maps/7.2.0_1644852482932/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_arrays,_structs,_and_maps/7.2.0_1644852482932/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_arrays,_structs,_and_maps/7.2.0_1644852482932/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_decimals/7.2.0_1646962668589/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_decimals/7.2.0_1646962668589/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_decimals/7.2.0_1646962668589/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_maps/7.2.0_1644852483075/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_maps/7.2.0_1644852483075/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_maps/7.2.0_1644852483075/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_structs/7.2.0_1644852483152/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_structs/7.2.0_1644852483152/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/earliest-offset-udaf_-_earliest_by_offset_with_structs/7.2.0_1644852483152/topology diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java index 741a27d71947..86753dfab243 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java @@ -41,13 +41,15 @@ class UdafFactoryInvoker implements FunctionSignature { private static final Logger LOG = LoggerFactory.getLogger(UdafFactoryInvoker.class); private final FunctionName functionName; - private final ParamType aggregateArgType; - private final ParamType aggregateReturnType; private final Optional metrics; private final List paramTypes; private final List params; private final Method method; private final String description; + private final UdafTypes types; + private final String aggregateSchema; + private final String outputSchema; + private ParamType aggregateReturnType; UdafFactoryInvoker( final Method method, @@ -70,10 +72,12 @@ class UdafFactoryInvoker implements FunctionSignature { if (!Modifier.isStatic(method.getModifiers())) { throw new KsqlException("UDAF factory methods must be static " + method); } - final UdafTypes types = new UdafTypes(method, functionName, typeParser); + this.types = new UdafTypes(method, functionName, typeParser); this.functionName = Objects.requireNonNull(functionName); - this.aggregateArgType = Objects.requireNonNull(types.getAggregateSchema(aggregateSchema)); - this.aggregateReturnType = Objects.requireNonNull(types.getOutputSchema(outputSchema)); + this.aggregateSchema = aggregateSchema; + this.outputSchema = outputSchema; + //this.aggregateArgType = Objects.requireNonNull(types.getAggregateSchema(aggregateSchema)); + //this.aggregateReturnType = Objects.requireNonNull(types.getOutputSchema(outputSchema)); this.metrics = Objects.requireNonNull(metrics); this.params = types.getInputSchema(Objects.requireNonNull(inputSchema)); this.paramTypes = params.stream().map(ParameterInfo::type).collect(Collectors.toList()); @@ -95,10 +99,14 @@ KsqlAggregateFunction createFunction(final AggregateFunctionInitArguments initAr } final SqlType aggregateSqlType = (SqlType) udaf.getAggregateSqlType() - .orElseGet(() -> SchemaConverters.functionToSqlConverter().toSqlType(aggregateArgType)); + .orElseGet(() -> SchemaConverters.functionToSqlConverter() + .toSqlType(types.getAggregateSchema(aggregateSchema))); final SqlType returnSqlType = (SqlType) udaf.getReturnSqlType() .orElseGet(() -> - SchemaConverters.functionToSqlConverter().toSqlType(aggregateReturnType)); + SchemaConverters.functionToSqlConverter() + .toSqlType(types.getOutputSchema(outputSchema))); + this.aggregateReturnType = + SchemaConverters.sqlToFunctionConverter().toFunctionType(returnSqlType); final KsqlAggregateFunction function; if (TableUdaf.class.isAssignableFrom(method.getReturnType())) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/offset/EarliestByOffset.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/offset/EarliestByOffset.java index a8bbe4588b0d..b44a76361259 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/offset/EarliestByOffset.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/offset/EarliestByOffset.java @@ -16,15 +16,6 @@ package io.confluent.ksql.function.udaf.offset; import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.INTERMEDIATE_STRUCT_COMPARATOR; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_BOOLEAN; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_BYTES; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_DATE; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_DOUBLE; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_INTEGER; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_LONG; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_STRING; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_TIME; -import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_TIMESTAMP; import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.VAL_FIELD; import com.google.common.annotations.VisibleForTesting; @@ -32,18 +23,20 @@ import io.confluent.ksql.function.udaf.Udaf; import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; +import io.confluent.ksql.schema.ksql.SchemaConverters; +import io.confluent.ksql.schema.ksql.SqlArgument; +import io.confluent.ksql.schema.ksql.types.SqlArray; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlConstants; -import java.nio.ByteBuffer; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +// name = "EARLIEST_BY_OFFSET_OLD", @UdafDescription( name = "EARLIEST_BY_OFFSET", description = EarliestByOffset.DESCRIPTION, @@ -59,249 +52,25 @@ private EarliestByOffset() { static final AtomicLong sequence = new AtomicLong(); - @UdafFactory(description = "return the earliest value of an integer column", - aggregateSchema = "STRUCT") - public static Udaf earliestInteger() { - return earliestInteger(true); + @UdafFactory(description = "return the earliest value of a column") + public static Udaf earliest() { + return earliest(true); } - @UdafFactory(description = "return the earliest value of an integer column", - aggregateSchema = "STRUCT") - public static Udaf earliestInteger(final boolean ignoreNulls) { - return earliest(STRUCT_INTEGER, ignoreNulls); + @UdafFactory(description = "return the earliest value of a column") + public static Udaf earliest(final boolean ignoreNulls) { + return earliestT(ignoreNulls); } - @UdafFactory(description = "return the earliest N values of an integer column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestIntegers(final int earliestN) { - return earliestIntegers(earliestN, true); + @UdafFactory(description = "return the earliest N values of a column") + public static Udaf, List> earliest(final int earliestN) { + return earliest(earliestN, true); } - @UdafFactory(description = "return the earliest N values of an integer column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestIntegers( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_INTEGER, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of an big integer column", - aggregateSchema = "STRUCT") - public static Udaf earliestLong() { - return earliestLong(true); - } - - @UdafFactory(description = "return the earliest value of an big integer column", - aggregateSchema = "STRUCT") - public static Udaf earliestLong(final boolean ignoreNulls) { - return earliest(STRUCT_LONG, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of an long column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestLongs(final int earliestN) { - return earliestLongs(earliestN, true); - } - - @UdafFactory(description = "return the earliest N values of an long column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestLongs( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_LONG, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of a double column", - aggregateSchema = "STRUCT") - public static Udaf earliestDouble() { - return earliestDouble(true); - } - - @UdafFactory(description = "return the earliest value of a double column", - aggregateSchema = "STRUCT") - public static Udaf earliestDouble(final boolean ignoreNulls) { - return earliest(STRUCT_DOUBLE, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of a double column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestDoubles(final int earliestN) { - return earliestDoubles(earliestN, true); - } - - @UdafFactory(description = "return the earliest N values of a double column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestDoubles( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_DOUBLE, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of a boolean column", - aggregateSchema = "STRUCT") - public static Udaf earliestBoolean() { - return earliestBoolean(true); - } - - @UdafFactory(description = "return the earliest value of a boolean column", - aggregateSchema = "STRUCT") - public static Udaf earliestBoolean(final boolean ignoreNulls) { - return earliest(STRUCT_BOOLEAN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of a boolean column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestBooleans(final int earliestN) { - return earliestBooleans(earliestN, true); - } - - @UdafFactory(description = "return the earliest N values of a boolean column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestBooleans( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_BOOLEAN, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of a string column", - aggregateSchema = "STRUCT") - public static Udaf earliestString() { - return earliestString(true); - } - - @UdafFactory(description = "return the earliest value of a string column", - aggregateSchema = "STRUCT") - public static Udaf earliestString(final boolean ignoreNulls) { - return earliest(STRUCT_STRING, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of a string column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestStrings(final int earliestN) { - return earliestStrings(earliestN, true); - } - - @UdafFactory(description = "return the earliest N values of a string column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestStrings( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_STRING, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of a timestamp column", - aggregateSchema = "STRUCT") - public static Udaf earliestTimestamp() { - return earliestTimestamp(true); - } - - @UdafFactory(description = "return the earliest value of a timestamp column", - aggregateSchema = "STRUCT") - public static Udaf earliestTimestamp(final boolean ignoreNulls) { - return earliest(STRUCT_TIMESTAMP, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of a timestamp column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestTimestamps( - final int earliestN) { - return earliestTimestamps(earliestN, true); - } - - @UdafFactory(description = "return the earliest N values of a timestamp column", - aggregateSchema = "ARRAY>") - public static Udaf, List> earliestTimestamps( - final int earliestN, - final boolean ignoreNulls - ) { - return earliestN(STRUCT_TIMESTAMP, earliestN, ignoreNulls); - } - - @UdafFactory(description = "return the earliest value of a time column", - aggregateSchema = "STRUCT") - public static Udaf earliestTime() { - return earliestTime(true); - } - - @UdafFactory(description = "return the earliest value of a time column", - aggregateSchema = "STRUCT") - public static Udaf earliestTime(final boolean ignoreNulls) { - return earliest(STRUCT_TIME, ignoreNulls); - } - - @UdafFactory(description = "return the earliest N values of a time column", - aggregateSchema = "ARRAY>") - public static Udaf, List