diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index a195ee15940a..cc6259313c98 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -138,15 +138,13 @@ The square root of a value. ## Collections -### `ARRAY_LENGTH` +### `ARRAY` ```sql -ARRAY_LENGTH(ARRAY[1, 2, 3]) +ARRAY[col1, col2, ...] ``` -Given an array, return the number of elements in the array. - -If the supplied parameter is NULL the method returns NULL. +Construct an array from a variable number of inputs. ### ``ARRAY_CONTAINS`` @@ -158,31 +156,62 @@ Given an array, checks if a search value is contained in the array. Accepts any `ARRAY` type. The type of the second param must match the element type of the `ARRAY`. -### `JSON_ARRAY_CONTAINS` +### `ARRAY_LENGTH` ```sql -JSON_ARRAY_CONTAINS('[1, 2, 3]', 3) +ARRAY_LENGTH(ARRAY[1, 2, 3]) ``` -Given a `STRING` containing a JSON array, checks if a search value is contained in the array. +Given an array, return the number of elements in the array. -Returns `false` if the first parameter does not contain a JSON array. +If the supplied parameter is NULL the method returns NULL. -### `ARRAY` +### ``ARRAY_MAX`` ```sql -ARRAY[col1, col2, ...] +ARRAY_MAX(['foo', 'bar', 'baz']) ``` -Construct an array from a variable number of inputs. +Returns the maximum value from within a given array of primitive elements (not arrays of other arrays, or maps, or structs, or combinations thereof). -### `MAP` +Array entries are compared according to their natural sort order, which sorts the various data-types per the following examples: +- ```array_max[-1, 2, NULL, 0] -> 2``` +- ```array_max[false, NULL, true] -> true``` +- ```array_max['Foo', 'Bar', NULL, 'baz'] -> 'baz'``` (lower-case characters are "greater" than upper-case characters) + +If the array field is NULL, or contains only NULLs, then NULL is returned. + +### ``ARRAY_MIN`` ```sql -MAP(key VARCHAR := value, ...) +ARRAY_MIN(['foo', 'bar', 'baz']) ``` -Construct a map from specific key-value tuples. +Returns the minimum value from within a given array of primitive elements (not arrays of other arrays, or maps, or structs, or combinations thereof). + +Array entries are compared according to their natural sort order, which sorts the various data-types per the following examples: +- ```array_min[-1, 2, NULL, 0] -> -1``` +- ```array_min[false, NULL, true] -> false``` +- ```array_min['Foo', 'Bar', NULL, 'baz'] -> 'Bar'``` + +If the array field is NULL, or contains only NULLs, then NULL is returned. + +### ``ARRAY_SORT`` + +```sql +ARRAY_SORT(['foo', 'bar', 'baz'], 'ASC|DESC') +``` + +Given an array of primitive elements (not arrays of other arrays, or maps, or structs, or combinations thereof), returns an array of the same elements sorted according to their natural sort order. Any NULLs contained in the array will always be moved to the end. + +For example: +- ```array_sort[-1, 2, NULL, 0] -> [-1, 0, 2, NULL]``` +- ```array_sort[false, NULL, true] -> [false, true, NULL]``` +- ```array_sort['Foo', 'Bar', NULL, 'baz'] -> ['Bar', 'Foo', 'baz', NULL]``` + +If the array field is NULL then NULL is returned. + +An optional second parameter can be used to specify whether to sort the elements in 'ASC'ending or 'DESC'ending order. If neither is specified then the default is ascending order. ### `AS_MAP` @@ -212,6 +241,25 @@ Returns the 1-indexed position of `str` in `args`, or 0 if not found. If `str` is NULL, the return value is 0, because NULL is not considered to be equal to any value. FIELD is the complement to ELT. +### `JSON_ARRAY_CONTAINS` + +```sql +JSON_ARRAY_CONTAINS('[1, 2, 3]', 3) +``` + +Given a `STRING` containing a JSON array, checks if a search value is contained in the array. + +Returns `false` if the first parameter does not contain a JSON array. + +### `MAP` + +```sql +MAP(key VARCHAR := value, ...) +``` + +Construct a map from specific key-value tuples. + + ### `SLICE` ```sql diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMax.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMax.java new file mode 100644 index 000000000000..ce9e8024e038 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMax.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.List; + +/** + * This UDF traverses the elements of an Array field to find and return the maximum contained value. + */ +@UdfDescription( + name = "array_max", + description = "Return the maximum value from within an array of primitive values, according to" + + " their natural sort order. If the array is NULL, or contains only NULLs, return NULL.") +public class ArrayMax { + + @Udf + public > T arrayMax(@UdfParameter( + description = "Array of values from which to find the maximum") final List input) { + if (input == null) { + return null; + } + + T candidate = (T) null; + for (T thisVal : input) { + if (thisVal != null) { + if (candidate == null) { + candidate = thisVal; + } else if (thisVal.compareTo(candidate) > 0) { + candidate = thisVal; + } + } + } + return candidate; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMin.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMin.java new file mode 100644 index 000000000000..f50417580a2a --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayMin.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.List; + +/** + * This UDF traverses the elements of an Array field to find and return the minimum contained value. + */ +@UdfDescription( + name = "array_min", + description = "Return the minimum value from within an array of primitive values, according to" + + " their natural sort order. If the array is NULL, or contains only NULLs, return NULL.") +public class ArrayMin { + + @Udf + public > T arrayMin(@UdfParameter( + description = "Array of values from which to find the minimum") final List input) { + if (input == null) { + return null; + } + + T candidate = (T) null; + for (T thisVal : input) { + if (thisVal != null) { + if (candidate == null) { + candidate = thisVal; + } else if (thisVal.compareTo(candidate) < 0) { + candidate = thisVal; + } + } + } + return candidate; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArraySort.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArraySort.java new file mode 100644 index 000000000000..11c5e528540d --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/array/ArraySort.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import static java.util.Comparator.naturalOrder; +import static java.util.Comparator.nullsLast; + +import com.google.common.collect.Lists; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.Collections; +import java.util.List; + +/** + * This UDF sorts the elements of an array according to their natural sort order. + */ +@UdfDescription( + name = "array_sort", + description = "Sort an array of primitive values, according to their natural sort order. Any " + + "NULLs in the array will be placed at the end.") +public class ArraySort { + + private static final List SORT_DIRECTION_ASC = Lists.newArrayList("ASC", "ASCENDING"); + private static final List SORT_DIRECTION_DESC = Lists.newArrayList("DESC", "DESCENDING"); + + @Udf + public > List arraySortDefault(@UdfParameter( + description = "The array to sort") final List input) { + return arraySortWithDirection(input, "ASC"); + } + + @Udf + public > List arraySortWithDirection(@UdfParameter( + description = "The array to sort") final List input, + @UdfParameter( + description = "Marks the end of the series (inclusive)") final String direction) { + if (input == null) { + return null; + } + if (SORT_DIRECTION_ASC.contains(direction.toUpperCase())) { + input.sort(nullsLast(naturalOrder())); + } else if (SORT_DIRECTION_DESC.contains(direction.toUpperCase())) { + input.sort(nullsLast(Collections.reverseOrder())); + } else { + return null; + } + return input; + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMaxTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMaxTest.java new file mode 100644 index 000000000000..8605162912cb --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMaxTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class ArrayMaxTest { + + private final ArrayMax udf = new ArrayMax(); + + @Test + public void shouldFindBoolMax() { + final List input = Arrays.asList(true, false, false); + assertThat(udf.arrayMax(input), is(Boolean.TRUE)); + } + + @Test + public void shouldFindIntMax() { + final List input = Arrays.asList(1, 3, -2); + assertThat(udf.arrayMax(input), is(3)); + } + + @Test + public void shouldFindBigIntMax() { + final List input = Arrays.asList(1L, 3L, -2L); + assertThat(udf.arrayMax(input), is(Long.valueOf(3))); + } + + @Test + public void shouldFindDoubleMax() { + final List input = + Arrays.asList(Double.valueOf(1.1), Double.valueOf(3.1), Double.valueOf(-1.1)); + assertThat(udf.arrayMax(input), is(Double.valueOf(3.1))); + } + + @Test + public void shouldFindStringMax() { + final List input = Arrays.asList("foo", "food", "bar"); + assertThat(udf.arrayMax(input), is("food")); + } + + @Test + public void shouldFindStringMaxMixedCase() { + final List input = Arrays.asList("foo", "Food", "bar"); + assertThat(udf.arrayMax(input), is("foo")); + } + + @Test + public void shouldFindDecimalMax() { + final List input = + Arrays.asList(BigDecimal.valueOf(1.2), BigDecimal.valueOf(1.3), BigDecimal.valueOf(-1.2)); + assertThat(udf.arrayMax(input), is(BigDecimal.valueOf(1.3))); + } + + @Test + public void shouldReturnNullForNullInput() { + assertThat(udf.arrayMax((List) null), is(nullValue())); + } + + @Test + public void shouldReturnNullForListOfNullInput() { + final List input = Arrays.asList(null, null, null); + assertThat(udf.arrayMax(input), is(nullValue())); + } + + @Test + public void shouldReturnValueForMixedInput() { + final List input = Arrays.asList(null, "foo", null, "bar", null); + assertThat(udf.arrayMax(input), is("foo")); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMinTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMinTest.java new file mode 100644 index 000000000000..7e31f1cddca0 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayMinTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class ArrayMinTest { + + private final ArrayMin udf = new ArrayMin(); + + @Test + public void shouldFindBoolMin() { + final List input = Arrays.asList(true, false, false); + assertThat(udf.arrayMin(input), is(Boolean.FALSE)); + } + + @Test + public void shouldFindIntMin() { + final List input = Arrays.asList(1, 3, -2); + assertThat(udf.arrayMin(input), is(-2)); + } + + @Test + public void shouldFindBigIntMin() { + final List input = Arrays.asList(1L, 3L, -2L); + assertThat(udf.arrayMin(input), is(Long.valueOf(-2))); + } + + @Test + public void shouldFindDoubleMin() { + final List input = + Arrays.asList(Double.valueOf(1.1), Double.valueOf(3.1), Double.valueOf(-1.1)); + assertThat(udf.arrayMin(input), is(Double.valueOf(-1.1))); + } + + @Test + public void shouldFindStringMin() { + final List input = Arrays.asList("foo", "food", "bar"); + assertThat(udf.arrayMin(input), is("bar")); + } + + @Test + public void shouldFindStringMinMixedCase() { + final List input = Arrays.asList("foo", "Food", "bar", "Bar", "Baz"); + assertThat(udf.arrayMin(input), is("Bar")); + } + + @Test + public void shouldFindDecimalMin() { + final List input = + Arrays.asList(BigDecimal.valueOf(1.2), BigDecimal.valueOf(1.3), BigDecimal.valueOf(-1.2)); + assertThat(udf.arrayMin(input), is(BigDecimal.valueOf(-1.2))); + } + + @Test + public void shouldReturnNullForNullInput() { + assertThat(udf.arrayMin((List) null), is(nullValue())); + } + + @Test + public void shouldReturnNullForListOfNullInput() { + final List input = Arrays.asList(null, null, null); + assertThat(udf.arrayMin(input), is(nullValue())); + } + + @Test + public void shouldReturnValueForMixedInput() { + final List input = Arrays.asList(null, "foo", null, "bar", null); + assertThat(udf.arrayMin(input), is("bar")); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArraySortTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArraySortTest.java new file mode 100644 index 000000000000..69fdfc6556c2 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/array/ArraySortTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.array; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class ArraySortTest { + + private final ArraySort udf = new ArraySort(); + + @Test + public void shouldSortBools() { + final List input = Arrays.asList(true, false, false); + final List output = udf.arraySortDefault(input); + assertThat(output, contains(Boolean.FALSE, Boolean.FALSE, Boolean.TRUE)); + } + + @Test + public void shouldSortInts() { + final List input = Arrays.asList(1, 3, -2); + final List output = udf.arraySortDefault(input); + assertThat(output, contains(-2, 1, 3)); + } + + @Test + public void shouldSortIntsAscending() { + final List input = Arrays.asList(1, 3, -2); + final List output = udf.arraySortWithDirection(input, "ascEnDing"); + assertThat(output, contains(-2, 1, 3)); + } + + @Test + public void shouldSortIntsDescending() { + final List input = Arrays.asList(1, 3, -2); + final List output = udf.arraySortWithDirection(input, "DEsc"); + assertThat(output, contains(3, 1, -2)); + } + + @Test + public void shouldSortBigInts() { + final List input = Arrays.asList(1L, 3L, -2L); + final List output = udf.arraySortDefault(input); + assertThat(output, contains(-2L, 1L, 3L)); + } + + @Test + public void shouldSortDoubles() { + final List input = + Arrays.asList(Double.valueOf(1.1), Double.valueOf(3.1), Double.valueOf(-1.1)); + final List output = udf.arraySortDefault(input); + assertThat(output, contains(Double.valueOf(-1.1), Double.valueOf(1.1), Double.valueOf(3.1))); + } + + @Test + public void shouldSortStrings() { + final List input = Arrays.asList("foo", "food", "bar"); + final List output = udf.arraySortDefault(input); + assertThat(output, contains("bar", "foo", "food")); + } + + @Test + public void shouldSortStringsMixedCase() { + final List input = Arrays.asList("foo", "Food", "bar", "Bar", "Baz"); + final List output = udf.arraySortDefault(input); + assertThat(output, contains("Bar", "Baz", "Food", "bar", "foo")); + } + + @Test + public void shouldSortDecimals() { + final List input = + Arrays.asList(BigDecimal.valueOf(1.2), BigDecimal.valueOf(1.3), BigDecimal.valueOf(-1.2)); + final List output = udf.arraySortDefault(input); + assertThat(output, + contains(BigDecimal.valueOf(-1.2), BigDecimal.valueOf(1.2), BigDecimal.valueOf(1.3))); + } + + @Test + public void shouldReturnNullForNullInput() { + assertThat(udf.arraySortDefault((List) null), is(nullValue())); + } + + @Test + public void shouldReturnNullForListOfNullInput() { + final List input = Arrays.asList(null, null, null); + assertThat(udf.arraySortDefault(input), + contains((Integer) null, (Integer) null, (Integer) null)); + } + + @Test + public void shouldSortNullsToEnd() { + final List input = Arrays.asList(null, "foo", null, "bar", null); + final List output = udf.arraySortDefault(input); + assertThat(output, contains("bar", "foo", null, null, null)); + } + + @Test + public void shouldSortNullsToEndDescending() { + final List input = Arrays.asList(null, "foo", null, "bar", null); + final List output = udf.arraySortWithDirection(input, "desc"); + assertThat(output, contains("foo", "bar", null, null, null)); + } + +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/plan.json new file mode 100644 index 000000000000..a76753159660 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, BOOL_ARRAY ARRAY, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_MAX(INPUT.BOOL_ARRAY) BOOL_MAX,\n ARRAY_MAX(INPUT.INT_ARRAY) INT_MAX,\n ARRAY_MAX(INPUT.BIGINT_ARRAY) BIGINT_MAX,\n ARRAY_MAX(INPUT.DOUBLE_ARRAY) DOUBLE_MAX,\n ARRAY_MAX(INPUT.STRING_ARRAY) STRING_MAX,\n ARRAY_MAX(INPUT.DECIMAL_ARRAY) DECIMAL_MAX\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `BOOL_MAX` BOOLEAN, `INT_MAX` INTEGER, `BIGINT_MAX` BIGINT, `DOUBLE_MAX` DOUBLE, `STRING_MAX` STRING, `DECIMAL_MAX` DECIMAL(2, 1)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_MAX(BOOL_ARRAY) AS BOOL_MAX", "ARRAY_MAX(INT_ARRAY) AS INT_MAX", "ARRAY_MAX(BIGINT_ARRAY) AS BIGINT_MAX", "ARRAY_MAX(DOUBLE_ARRAY) AS DOUBLE_MAX", "ARRAY_MAX(STRING_ARRAY) AS STRING_MAX", "ARRAY_MAX(DECIMAL_ARRAY) AS DECIMAL_MAX" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/spec.json new file mode 100644 index 000000000000..e69bca469a55 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/spec.json @@ -0,0 +1,117 @@ +{ + "version" : "6.0.0", + "timestamp" : 1590779902575, + "path" : "query-validation-tests\\array-min-max-sort.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "array_max", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "bool_array" : [ false, true, false ], + "int_array" : [ 0, 0, 1, 0, -1 ], + "bigint_array" : [ 234, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, 0.2, 0.3 ], + "string_array" : [ "foo", "bar" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, 1.9, 9.0, -9.9, 1.5 ] + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "bool_array" : [ null, false, true ], + "int_array" : [ 0, null, 1, 0, -1 ], + "bigint_array" : [ null, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, null, 0.3 ], + "string_array" : [ "foo", "fo", "Food", null, "F", "food" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, null, 9.0 ] + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "bool_array" : [ ], + "int_array" : [ ], + "bigint_array" : [ ], + "double_array" : [ ], + "string_array" : [ ], + "decimal_array" : [ ] + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "BOOL_MAX" : true, + "INT_MAX" : 1, + "BIGINT_MAX" : 345, + "DOUBLE_MAX" : 0.3, + "STRING_MAX" : "foo", + "DECIMAL_MAX" : 9.0 + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "BOOL_MAX" : true, + "INT_MAX" : 1, + "BIGINT_MAX" : 345, + "DOUBLE_MAX" : 0.3, + "STRING_MAX" : "food", + "DECIMAL_MAX" : 9.0 + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "BOOL_MAX" : null, + "INT_MAX" : null, + "BIGINT_MAX" : null, + "DOUBLE_MAX" : null, + "STRING_MAX" : null, + "DECIMAL_MAX" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, array_max(bool_array) as bool_max, array_max(int_array) as int_max, array_max(bigint_array) as bigint_max, array_max(double_array) as double_max, array_max(string_array) as string_max, array_max(decimal_array) as decimal_max FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_max/6.0.0_1590779902575/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/plan.json new file mode 100644 index 000000000000..984a9308e795 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, BOOL_ARRAY ARRAY, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_MIN(INPUT.BOOL_ARRAY) BOOL_MIN,\n ARRAY_MIN(INPUT.INT_ARRAY) INT_MIN,\n ARRAY_MIN(INPUT.BIGINT_ARRAY) BIGINT_MIN,\n ARRAY_MIN(INPUT.DOUBLE_ARRAY) DOUBLE_MIN,\n ARRAY_MIN(INPUT.STRING_ARRAY) STRING_MIN,\n ARRAY_MIN(INPUT.DECIMAL_ARRAY) DECIMAL_MIN\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `BOOL_MIN` BOOLEAN, `INT_MIN` INTEGER, `BIGINT_MIN` BIGINT, `DOUBLE_MIN` DOUBLE, `STRING_MIN` STRING, `DECIMAL_MIN` DECIMAL(2, 1)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_MIN(BOOL_ARRAY) AS BOOL_MIN", "ARRAY_MIN(INT_ARRAY) AS INT_MIN", "ARRAY_MIN(BIGINT_ARRAY) AS BIGINT_MIN", "ARRAY_MIN(DOUBLE_ARRAY) AS DOUBLE_MIN", "ARRAY_MIN(STRING_ARRAY) AS STRING_MIN", "ARRAY_MIN(DECIMAL_ARRAY) AS DECIMAL_MIN" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/spec.json new file mode 100644 index 000000000000..95959473267d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/spec.json @@ -0,0 +1,117 @@ +{ + "version" : "6.0.0", + "timestamp" : 1590779902895, + "path" : "query-validation-tests\\array-min-max-sort.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "array_min", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "bool_array" : [ false, true, false ], + "int_array" : [ 0, 0, 1, 0, -1 ], + "bigint_array" : [ 234, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, 0.2, 0.3 ], + "string_array" : [ "foo", "bar" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, 1.9, 9.0, -9.9, 1.5 ] + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "bool_array" : [ null, false, true ], + "int_array" : [ 0, null, 1, 0, -1 ], + "bigint_array" : [ null, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, null, 0.3 ], + "string_array" : [ "foo", "fo", "Food", null, "F", "food" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, null, 9.0 ] + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "bool_array" : [ ], + "int_array" : [ ], + "bigint_array" : [ ], + "double_array" : [ ], + "string_array" : [ ], + "decimal_array" : [ ] + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "BOOL_MIN" : false, + "INT_MIN" : -1, + "BIGINT_MIN" : -123, + "DOUBLE_MIN" : -12345.678, + "STRING_MIN" : "bar", + "DECIMAL_MIN" : -9.9 + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "BOOL_MIN" : false, + "INT_MIN" : -1, + "BIGINT_MIN" : -123, + "DOUBLE_MIN" : -12345.678, + "STRING_MIN" : "F", + "DECIMAL_MIN" : -0.2 + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "BOOL_MIN" : null, + "INT_MIN" : null, + "BIGINT_MIN" : null, + "DOUBLE_MIN" : null, + "STRING_MIN" : null, + "DECIMAL_MIN" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, array_min(bool_array) as bool_min, array_min(int_array) as int_min, array_min(bigint_array) as bigint_min, array_min(double_array) as double_min, array_min(string_array) as string_min, array_min(decimal_array) as decimal_min FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_min/6.0.0_1590779902895/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/plan.json new file mode 100644 index 000000000000..db4cbddc90f7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, BOOL_ARRAY ARRAY, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_SORT(INPUT.BOOL_ARRAY) BOOLS,\n ARRAY_SORT(INPUT.INT_ARRAY) INTS,\n ARRAY_SORT(INPUT.BIGINT_ARRAY) BIGINTS,\n ARRAY_SORT(INPUT.DOUBLE_ARRAY) DOUBLES,\n ARRAY_SORT(INPUT.STRING_ARRAY) STRINGS,\n ARRAY_SORT(INPUT.DECIMAL_ARRAY) DECIMALS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `BOOLS` ARRAY, `INTS` ARRAY, `BIGINTS` ARRAY, `DOUBLES` ARRAY, `STRINGS` ARRAY, `DECIMALS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_SORT(BOOL_ARRAY) AS BOOLS", "ARRAY_SORT(INT_ARRAY) AS INTS", "ARRAY_SORT(BIGINT_ARRAY) AS BIGINTS", "ARRAY_SORT(DOUBLE_ARRAY) AS DOUBLES", "ARRAY_SORT(STRING_ARRAY) AS STRINGS", "ARRAY_SORT(DECIMAL_ARRAY) AS DECIMALS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/spec.json new file mode 100644 index 000000000000..8c3e755601df --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/spec.json @@ -0,0 +1,139 @@ +{ + "version" : "6.0.0", + "timestamp" : 1590779903355, + "path" : "query-validation-tests\\array-min-max-sort.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT, INTS ARRAY, BIGINTS ARRAY, DOUBLES ARRAY, STRINGS ARRAY, DECIMALS ARRAY> NOT NULL" + }, + "testCase" : { + "name" : "array_sort_asc", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "bool_array" : [ false, true, false ], + "int_array" : [ 0, 0, 1, 0, -1 ], + "bigint_array" : [ 234, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, 0.2, 0.3 ], + "string_array" : [ "foo", "bar" ], + "decimal_array" : [ 1.0, 1.1, -0.2, 1.9, 9.0, -9.9 ] + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "bool_array" : [ null, false, true ], + "int_array" : [ 0, null, 1, 0, -1 ], + "bigint_array" : [ null, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, null, 0.3 ], + "string_array" : [ "foo", "fo", "Food", null, "F", "food" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, null, 9.0 ] + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "bool_array" : [ ], + "int_array" : [ ], + "bigint_array" : [ ], + "double_array" : [ ], + "string_array" : [ ], + "decimal_array" : [ ] + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "bool_array" : null, + "int_array" : null, + "bigint_array" : null, + "double_array" : null, + "string_array" : null, + "decimal_array" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "BOOLS" : [ false, false, true ], + "INTS" : [ -1, 0, 0, 0, 1 ], + "BIGINTS" : [ -123, 234, 345 ], + "DOUBLES" : [ -12345.678, 0.0, 0.1, 0.2, 0.3 ], + "STRINGS" : [ "bar", "foo" ], + "DECIMALS" : [ -9.9, -0.2, 1.0, 1.1, 1.9, 9.0 ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "BOOLS" : [ false, true, null ], + "INTS" : [ -1, 0, 0, 1, null ], + "BIGINTS" : [ -123, 345, null ], + "DOUBLES" : [ -12345.678, 0.0, 0.1, 0.3, null ], + "STRINGS" : [ "F", "Food", "fo", "foo", "food", null ], + "DECIMALS" : [ -0.2, 1.0, 1.1, 1.2, 9.0, null ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "BOOLS" : [ ], + "INTS" : [ ], + "BIGINTS" : [ ], + "DOUBLES" : [ ], + "STRINGS" : [ ], + "DECIMALS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "BOOLS" : null, + "INTS" : null, + "BIGINTS" : null, + "DOUBLES" : null, + "STRINGS" : null, + "DECIMALS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, array_sort(bool_array) as bools, array_sort(int_array) as ints, array_sort(bigint_array) as bigints, array_sort(double_array) as doubles, array_sort(string_array) as strings, array_sort(decimal_array) as decimals FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_asc/6.0.0_1590779903355/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/plan.json new file mode 100644 index 000000000000..1649165b3a68 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, BOOL_ARRAY ARRAY, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_SORT(INPUT.BOOL_ARRAY, 'desc') BOOLS,\n ARRAY_SORT(INPUT.INT_ARRAY, 'desc') INTS,\n ARRAY_SORT(INPUT.BIGINT_ARRAY, 'desc') BIGINTS,\n ARRAY_SORT(INPUT.DOUBLE_ARRAY, 'desc') DOUBLES,\n ARRAY_SORT(INPUT.STRING_ARRAY, 'desc') STRINGS,\n ARRAY_SORT(INPUT.DECIMAL_ARRAY, 'desc') DECIMALS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `BOOLS` ARRAY, `INTS` ARRAY, `BIGINTS` ARRAY, `DOUBLES` ARRAY, `STRINGS` ARRAY, `DECIMALS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY, `INT_ARRAY` ARRAY, `BIGINT_ARRAY` ARRAY, `DOUBLE_ARRAY` ARRAY, `STRING_ARRAY` ARRAY, `DECIMAL_ARRAY` ARRAY" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_SORT(BOOL_ARRAY, 'desc') AS BOOLS", "ARRAY_SORT(INT_ARRAY, 'desc') AS INTS", "ARRAY_SORT(BIGINT_ARRAY, 'desc') AS BIGINTS", "ARRAY_SORT(DOUBLE_ARRAY, 'desc') AS DOUBLES", "ARRAY_SORT(STRING_ARRAY, 'desc') AS STRINGS", "ARRAY_SORT(DECIMAL_ARRAY, 'desc') AS DECIMALS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/spec.json new file mode 100644 index 000000000000..4265104fb4ae --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/spec.json @@ -0,0 +1,139 @@ +{ + "version" : "6.0.0", + "timestamp" : 1590779903881, + "path" : "query-validation-tests\\array-min-max-sort.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT, INT_ARRAY ARRAY, BIGINT_ARRAY ARRAY, DOUBLE_ARRAY ARRAY, STRING_ARRAY ARRAY, DECIMAL_ARRAY ARRAY> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT, INTS ARRAY, BIGINTS ARRAY, DOUBLES ARRAY, STRINGS ARRAY, DECIMALS ARRAY> NOT NULL" + }, + "testCase" : { + "name" : "array_sort_desc", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "bool_array" : [ false, true, false ], + "int_array" : [ 0, 0, 1, 0, -1 ], + "bigint_array" : [ 234, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, 0.2, 0.3 ], + "string_array" : [ "foo", "bar" ], + "decimal_array" : [ 1.0, 1.1, -0.2, 1.9, 9.0, -9.9 ] + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "bool_array" : [ null, false, true ], + "int_array" : [ 0, null, 1, 0, -1 ], + "bigint_array" : [ null, -123, 345 ], + "double_array" : [ 0.0, 0.1, -12345.678, null, 0.3 ], + "string_array" : [ "foo", "fo", "Food", null, "F", "food" ], + "decimal_array" : [ 1.0, 1.1, 1.2, -0.2, null, 9.0 ] + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "bool_array" : [ ], + "int_array" : [ ], + "bigint_array" : [ ], + "double_array" : [ ], + "string_array" : [ ], + "decimal_array" : [ ] + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "bool_array" : null, + "int_array" : null, + "bigint_array" : null, + "double_array" : null, + "string_array" : null, + "decimal_array" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "BOOLS" : [ true, false, false ], + "INTS" : [ 1, 0, 0, 0, -1 ], + "BIGINTS" : [ 345, 234, -123 ], + "DOUBLES" : [ 0.3, 0.2, 0.1, 0.0, -12345.678 ], + "STRINGS" : [ "foo", "bar" ], + "DECIMALS" : [ 9.0, 1.9, 1.1, 1.0, -0.2, -9.9 ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "BOOLS" : [ true, false, null ], + "INTS" : [ 1, 0, 0, -1, null ], + "BIGINTS" : [ 345, -123, null ], + "DOUBLES" : [ 0.3, 0.1, 0.0, -12345.678, null ], + "STRINGS" : [ "food", "foo", "fo", "Food", "F", null ], + "DECIMALS" : [ 9.0, 1.2, 1.1, 1.0, -0.2, null ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "BOOLS" : [ ], + "INTS" : [ ], + "BIGINTS" : [ ], + "DOUBLES" : [ ], + "STRINGS" : [ ], + "DECIMALS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "BOOLS" : null, + "INTS" : null, + "BIGINTS" : null, + "DOUBLES" : null, + "STRINGS" : null, + "DECIMALS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, array_sort(bool_array, 'desc') as bools, array_sort(int_array, 'desc') as ints, array_sort(bigint_array, 'desc') as bigints, array_sort(double_array, 'desc') as doubles, array_sort(string_array, 'desc') as strings, array_sort(decimal_array, 'desc') as decimals FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array-min-max-sort_-_array_sort_desc/6.0.0_1590779903881/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/array-min-max-sort.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/array-min-max-sort.json new file mode 100644 index 000000000000..f36677da276b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/array-min-max-sort.json @@ -0,0 +1,127 @@ +{ + "comments": [ + "Tests covering the use of UDFs for sorting and finding the min/max values for arrays." + ], + "tests": [ + { + "name": "array_max", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, array_max(bool_array) as bool_max, array_max(int_array) as int_max, array_max(bigint_array) as bigint_max, array_max(double_array) as double_max, array_max(string_array) as string_max, array_max(decimal_array) as decimal_max FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"bool_array": [false, true, false], "int_array": [0,0,1,0,-1], "bigint_array": [234, -123, 345], "double_array": [0.0, 0.1, -12345.678, 0.2, 0.3], "string_array": ["foo", "bar"], "decimal_array": [1.0, 1.1, 1.2, -0.2, 1.9, 9.0, -9.9, 1.5]}}, + {"topic": "test_topic", "key": "r2", "value": {"bool_array": [null, false, true], "int_array": [0,null,1,0,-1], "bigint_array": [null, -123, 345], "double_array": [0.0, 0.1, -12345.678, null, 0.3], "string_array": ["foo", "fo", "Food", null, "F", "food"], "decimal_array": [1.0, 1.1, 1.2, -0.2, null, 9.0]}}, + {"topic": "test_topic", "key": "r3", "value": {"bool_array": [], "int_array": [], "bigint_array": [], "double_array": [], "string_array": [], "decimal_array": []}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"BOOL_MAX": true, "INT_MAX": 1, "BIGINT_MAX": 345, "DOUBLE_MAX": 0.3, "STRING_MAX": "foo", "DECIMAL_MAX": 9.0}}, + {"topic": "OUTPUT", "key": "r2", "value": {"BOOL_MAX": true, "INT_MAX": 1, "BIGINT_MAX": 345, "DOUBLE_MAX": 0.3, "STRING_MAX": "food", "DECIMAL_MAX": 9.0}}, + {"topic": "OUTPUT", "key": "r3", "value": {"BOOL_MAX": null, "INT_MAX": null, "BIGINT_MAX": null, "DOUBLE_MAX": null, "STRING_MAX": null, "DECIMAL_MAX": null}} + ] + }, + { + "name": "array_min", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, array_min(bool_array) as bool_min, array_min(int_array) as int_min, array_min(bigint_array) as bigint_min, array_min(double_array) as double_min, array_min(string_array) as string_min, array_min(decimal_array) as decimal_min FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"bool_array": [false, true, false], "int_array": [0,0,1,0,-1], "bigint_array": [234, -123, 345], "double_array": [0.0, 0.1, -12345.678, 0.2, 0.3], "string_array": ["foo", "bar"], "decimal_array": [1.0, 1.1, 1.2, -0.2, 1.9, 9.0, -9.9, 1.5]}}, + {"topic": "test_topic", "key": "r2", "value": {"bool_array": [null, false, true], "int_array": [0,null,1,0,-1], "bigint_array": [null, -123, 345], "double_array": [0.0, 0.1, -12345.678, null, 0.3], "string_array": ["foo", "fo", "Food", null, "F", "food"], "decimal_array": [1.0, 1.1, 1.2, -0.2, null, 9.0]}}, + {"topic": "test_topic", "key": "r3", "value": {"bool_array": [], "int_array": [], "bigint_array": [], "double_array": [], "string_array": [], "decimal_array": []}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"BOOL_MIN": false, "INT_MIN": -1, "BIGINT_MIN": -123, "DOUBLE_MIN": -12345.678, "STRING_MIN": "bar", "DECIMAL_MIN": -9.9}}, + {"topic": "OUTPUT", "key": "r2", "value": {"BOOL_MIN": false, "INT_MIN": -1, "BIGINT_MIN": -123, "DOUBLE_MIN": -12345.678, "STRING_MIN": "F", "DECIMAL_MIN": -0.2}}, + {"topic": "OUTPUT", "key": "r3", "value": {"BOOL_MIN": null, "INT_MIN": null, "BIGINT_MIN": null, "DOUBLE_MIN": null, "STRING_MIN": null, "DECIMAL_MIN": null}} + ] + }, + { + "name": "array_sort_asc", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, array_sort(bool_array) as bools, array_sort(int_array) as ints, array_sort(bigint_array) as bigints, array_sort(double_array) as doubles, array_sort(string_array) as strings, array_sort(decimal_array) as decimals FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"bool_array": [false, true, false], "int_array": [0,0,1,0,-1], "bigint_array": [234, -123, 345], "double_array": [0.0, 0.1, -12345.678, 0.2, 0.3], "string_array": ["foo", "bar"], "decimal_array": [1.0, 1.1, -0.2, 1.9, 9.0, -9.9]}}, + {"topic": "test_topic", "key": "r2", "value": {"bool_array": [null, false, true], "int_array": [0,null,1,0,-1], "bigint_array": [null, -123, 345], "double_array": [0.0, 0.1, -12345.678, null, 0.3], "string_array": ["foo", "fo", "Food", null, "F", "food"], "decimal_array": [1.0, 1.1, 1.2, -0.2, null, 9.0]}}, + {"topic": "test_topic", "key": "r3", "value": {"bool_array": [], "int_array": [], "bigint_array": [], "double_array": [], "string_array": [], "decimal_array": []}}, + {"topic": "test_topic", "key": "r4", "value": {"bool_array": null, "int_array": null, "bigint_array": null, "double_array": null, "string_array": null, "decimal_array": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": { + "BOOLS": [false, false, true], + "INTS": [-1, 0, 0, 0, 1], + "BIGINTS": [-123, 234, 345], + "DOUBLES": [-12345.678, 0.0, 0.1, 0.2, 0.3], + "STRINGS": ["bar", "foo"], + "DECIMALS": [-9.9, -0.2, 1.0, 1.1, 1.9, 9.0]}}, + {"topic": "OUTPUT", "key": "r2", "value": { + "BOOLS": [false, true, null], + "INTS": [-1, 0, 0, 1, null], + "BIGINTS": [-123, 345, null], + "DOUBLES": [-12345.678, 0.0, 0.1, 0.3, null], + "STRINGS": ["F", "Food", "fo", "foo", "food", null], + "DECIMALS": [-0.2, 1.0, 1.1, 1.2, 9.0, null]}}, + {"topic": "OUTPUT", "key": "r3", "value": { + "BOOLS": [], + "INTS": [], + "BIGINTS": [], + "DOUBLES": [], + "STRINGS": [], + "DECIMALS": [] }}, + {"topic": "OUTPUT", "key": "r4", "value": { + "BOOLS": null, + "INTS": null, + "BIGINTS": null, + "DOUBLES": null, + "STRINGS": null, + "DECIMALS": null}} + ] + }, + { + "name": "array_sort_desc", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, bool_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, array_sort(bool_array, 'desc') as bools, array_sort(int_array, 'desc') as ints, array_sort(bigint_array, 'desc') as bigints, array_sort(double_array, 'desc') as doubles, array_sort(string_array, 'desc') as strings, array_sort(decimal_array, 'desc') as decimals FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"bool_array": [false, true, false], "int_array": [0,0,1,0,-1], "bigint_array": [234, -123, 345], "double_array": [0.0, 0.1, -12345.678, 0.2, 0.3], "string_array": ["foo", "bar"], "decimal_array": [1.0, 1.1, -0.2, 1.9, 9.0, -9.9]}}, + {"topic": "test_topic", "key": "r2", "value": {"bool_array": [null, false, true], "int_array": [0,null,1,0,-1], "bigint_array": [null, -123, 345], "double_array": [0.0, 0.1, -12345.678, null, 0.3], "string_array": ["foo", "fo", "Food", null, "F", "food"], "decimal_array": [1.0, 1.1, 1.2, -0.2, null, 9.0]}}, + {"topic": "test_topic", "key": "r3", "value": {"bool_array": [], "int_array": [], "bigint_array": [], "double_array": [], "string_array": [], "decimal_array": []}}, + {"topic": "test_topic", "key": "r4", "value": {"bool_array": null, "int_array": null, "bigint_array": null, "double_array": null, "string_array": null, "decimal_array": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": { + "BOOLS": [true, false, false], + "INTS": [1, 0, 0, 0, -1], + "BIGINTS": [345, 234, -123], + "DOUBLES": [0.3, 0.2, 0.1, 0.0, -12345.678], + "STRINGS": ["foo", "bar"], + "DECIMALS": [9.0, 1.9, 1.1, 1.0, -0.2, -9.9]}}, + {"topic": "OUTPUT", "key": "r2", "value": { + "BOOLS": [true, false, null], + "INTS": [1, 0, 0, -1, null], + "BIGINTS": [345, -123, null], + "DOUBLES": [0.3, 0.1, 0.0, -12345.678, null], + "STRINGS": ["food", "foo", "fo", "Food", "F", null], + "DECIMALS": [9.0, 1.2, 1.1, 1.0, -0.2, null]}}, + {"topic": "OUTPUT", "key": "r3", "value": { + "BOOLS": [], + "INTS": [], + "BIGINTS": [], + "DOUBLES": [], + "STRINGS": [], + "DECIMALS": [] }}, + {"topic": "OUTPUT", "key": "r4", "value": { + "BOOLS": null, + "INTS": null, + "BIGINTS": null, + "DOUBLES": null, + "STRINGS": null, + "DECIMALS": null}} + ] + } + ] +} \ No newline at end of file