From c239785179b3e60933ffd49360312f60bdc67088 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Mon, 22 Jul 2019 17:11:33 -0700 Subject: [PATCH] fix: `COLLECT_LIST` can now be applied to tables (#3104) --- .../function/udaf/array/CollectListUdaf.java | 11 +- .../udaf/array/CollectListUdafTest.java | 9 +- .../query-validation-tests/collect-list.json | 111 ++++++++++++++++++ 3 files changed, 121 insertions(+), 10 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java index 560ac60a2753..5b0c183f1a80 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java @@ -17,7 +17,6 @@ import com.google.common.collect.Lists; import io.confluent.ksql.function.udaf.TableUdaf; -import io.confluent.ksql.function.udaf.Udaf; import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; import java.util.List; @@ -69,27 +68,27 @@ public List undo(final T valueToUndo, final List aggregateValue) { } @UdafFactory(description = "collect values of a Bigint field into a single Array") - public static Udaf> createCollectListLong() { + public static TableUdaf> createCollectListLong() { return listCollector(); } @UdafFactory(description = "collect values of an Integer field into a single Array") - public static Udaf> createCollectListInt() { + public static TableUdaf> createCollectListInt() { return listCollector(); } @UdafFactory(description = "collect values of a Double field into a single Array") - public static Udaf> createCollectListDouble() { + public static TableUdaf> createCollectListDouble() { return listCollector(); } @UdafFactory(description = "collect values of a String/Varchar field into a single Array") - public static Udaf> createCollectListString() { + public static TableUdaf> createCollectListString() { return listCollector(); } @UdafFactory(description = "collect values of a Boolean field into a single Array") - public static Udaf> createCollectListBool() { + public static TableUdaf> createCollectListBool() { return listCollector(); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java index 99b555c49691..9683d3fd934c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java @@ -20,7 +20,8 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import io.confluent.ksql.function.udaf.Udaf; + +import io.confluent.ksql.function.udaf.TableUdaf; import java.util.List; import org.junit.Test; @@ -28,7 +29,7 @@ public class CollectListUdafTest { @Test public void shouldCollectInts() { - final Udaf> udaf = CollectListUdaf.createCollectListInt(); + final TableUdaf> udaf = CollectListUdaf.createCollectListInt(); final Integer[] values = new Integer[] {3, 4, 5, 3}; List runningList = udaf.initialize(); for (final Integer i : values) { @@ -39,7 +40,7 @@ public void shouldCollectInts() { @Test public void shouldMergeIntLists() { - final Udaf> udaf = CollectListUdaf.createCollectListInt(); + final TableUdaf> udaf = CollectListUdaf.createCollectListInt(); List lhs = udaf.initialize(); final Integer[] lhsValues = new Integer[] {1, 2, null, 3}; @@ -61,7 +62,7 @@ public void shouldMergeIntLists() { @Test public void shouldRespectSizeLimit() { - final Udaf> udaf = CollectListUdaf.createCollectListInt(); + final TableUdaf> udaf = CollectListUdaf.createCollectListInt(); List runningList = udaf.initialize(); for (int i = 1; i < 2500; i++) { runningList = udaf.aggregate(i, runningList); diff --git a/ksql-engine/src/test/resources/query-validation-tests/collect-list.json b/ksql-engine/src/test/resources/query-validation-tests/collect-list.json index 40bc96b02de1..8288942a9e55 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/collect-list.json +++ b/ksql-engine/src/test/resources/query-validation-tests/collect-list.json @@ -107,6 +107,117 @@ {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true,false]}, "timestamp": 0}, {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true,false,true]}, "timestamp": 0} ] + }, + { + "name": "collect_list int table", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE TABLE TEST (ID bigint, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');", + "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 0}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 100}, "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [0]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [100]}, "timestamp": 0} + ] + }, + { + "name": "collect_list long table", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", + "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 2147483648}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 100}, "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [2147483648]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [100]}, "timestamp": 0} + ] + }, + { + "name": "collect_list double table", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE TABLE TEST (ID bigint, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", + "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 5.4}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100.1}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500.9}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 300.8}, "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [5.4]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100.1]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500.9]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [300.8]}, "timestamp": 0} + ] + }, + { + "name": "collect_list string table", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE TABLE TEST (ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", + "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": "foo"}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "baz"}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": "bar"}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "baz"}, "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "foo"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": ["foo"]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["baz"]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": ["bar"]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["baz"]}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0}, + {"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["foo"]}, "timestamp": 0} + ] + }, + { + "name": "collect_list bool map table", + "format": ["JSON"], + "statements": [ + "CREATE TABLE TEST (ID bigint, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", + "CREATE TABLE S2 as SELECT id, collect_list(value['key1']) AS collected FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":true, "key2":false}}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":false, "key2":true}}, "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":true, "key2":true}}, "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[false]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[]}, "timestamp": 0}, + {"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true]}, "timestamp": 0} + ] } ] } \ No newline at end of file