diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/Entries.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/Entries.java new file mode 100644 index 000000000000..6eb62c1d41ac --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/Entries.java @@ -0,0 +1,126 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +/** + * This UDF constructs an array of structs from the entries in a map. Each struct has a field with + * name "K" containing the key (this is always a String) and a field with name "V" holding the + * value; + */ +@UdfDescription(name = "ENTRIES", + description = + "Construct an array from the entries in a map." + + "The array can be optionally sorted on the keys." +) +public class Entries { + + private static final Schema INT_STRUCT_SCHEMA = buildStructSchema(Schema.OPTIONAL_INT32_SCHEMA); + private static final Schema BIGINT_STRUCT_SCHEMA = buildStructSchema( + Schema.OPTIONAL_INT64_SCHEMA); + private static final Schema DOUBLE_STRUCT_SCHEMA = buildStructSchema( + Schema.OPTIONAL_FLOAT64_SCHEMA); + private static final Schema BOOLEAN_STRUCT_SCHEMA = buildStructSchema( + Schema.OPTIONAL_BOOLEAN_SCHEMA); + private static final Schema STRING_STRUCT_SCHEMA = buildStructSchema( + Schema.OPTIONAL_STRING_SCHEMA); + private static final String KEY_FIELD_NAME = "K"; + private static final String VALUE_FIELD_NAME = "V"; + + private static Schema buildStructSchema(final Schema valueSchema) { + return SchemaBuilder.struct().field(KEY_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(VALUE_FIELD_NAME, valueSchema).optional().build(); + } + + @Udf(schema = "ARRAY>") + public List entriesInt( + @UdfParameter(description = "The map to create entries from") final Map map, + @UdfParameter(description = "If true then the resulting entries are sorted by key") + final boolean sorted + ) { + return entries(map, INT_STRUCT_SCHEMA, sorted); + } + + @Udf(schema = "ARRAY>") + public List entriesBigInt( + @UdfParameter(description = "The map to create entries from") final Map map, + @UdfParameter(description = "If true then the resulting entries are sorted by key") + final boolean sorted + ) { + return entries(map, BIGINT_STRUCT_SCHEMA, sorted); + } + + @Udf(schema = "ARRAY>") + public List entriesDouble( + @UdfParameter(description = "The map to create entries from") final Map map, + @UdfParameter(description = "If true then the resulting entries are sorted by key") + final boolean sorted + ) { + return entries(map, DOUBLE_STRUCT_SCHEMA, sorted); + } + + @Udf(schema = "ARRAY>") + public List entriesBoolean( + @UdfParameter(description = "The map to create entries from") final Map map, + @UdfParameter(description = "If true then the resulting entries are sorted by key") + final boolean sorted + ) { + return entries(map, BOOLEAN_STRUCT_SCHEMA, sorted); + } + + @Udf(schema = "ARRAY>") + public List entriesString( + @UdfParameter(description = "The map to create entries from") final Map map, + @UdfParameter(description = "If true then the resulting entries are sorted by key") + final boolean sorted + ) { + return entries(map, STRING_STRUCT_SCHEMA, sorted); + } + + private List entries( + final Map map, final Schema structSchema, final boolean sorted + ) { + if (map == null) { + return null; + } + final List structs = new ArrayList<>(map.size()); + Collection> entries = map.entrySet(); + if (sorted) { + final List> list = new ArrayList<>(entries); + list.sort(Comparator.comparing(Entry::getKey)); + entries = list; + } + for (Map.Entry entry : entries) { + final Struct struct = new Struct(structSchema); + struct.put(KEY_FIELD_NAME, entry.getKey()).put(VALUE_FIELD_NAME, entry.getValue()); + structs.add(struct); + } + return structs; + } + +} \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/GenerateSeries.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/GenerateSeries.java new file mode 100644 index 000000000000..a2ae5a9d4acf --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/GenerateSeries.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import io.confluent.ksql.function.KsqlFunctionException; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.ArrayList; +import java.util.List; + +/** + * This UDF constructs an array containing an array of INTs or BIGINTs in the specified range + */ +@UdfDescription(name = "GENERATE_SERIES", description = "Construct an array of a range of values") +public class GenerateSeries { + + @Udf + public List generateSeriesInt( + @UdfParameter(description = "The beginning of the series") final int start, + @UdfParameter(description = "Marks the end of the series (inclusive)") final int end + ) { + return generateSeriesInt(start, end, end - start > 0 ? 1 : -1); + } + + @Udf + public List generateSeriesInt( + @UdfParameter(description = "The beginning of the series") final int start, + @UdfParameter(description = "Marks the end of the series (inclusive)") final int end, + @UdfParameter(description = "Difference between each value in the series") final int step + ) { + checkStep(step); + final int diff = end - start; + if (diff > 0 && step < 0 || diff < 0 && step > 0) { + throw new KsqlFunctionException("GENERATE_SERIES step has wrong sign"); + } + final int size = 1 + diff / step; + final List result = new ArrayList<>(size); + int pos = 0; + int val = start; + while (pos++ < size) { + result.add(val); + val += step; + } + return result; + } + + @Udf + public List generateSeriesLong( + @UdfParameter(description = "The beginning of the series") final long start, + @UdfParameter(description = "Marks the end of the series (inclusive)") final long end + ) { + return generateSeriesLong(start, end, end - start > 0 ? 1 : -1); + } + + @Udf + public List generateSeriesLong( + @UdfParameter(description = "The beginning of the series") final long start, + @UdfParameter + (description = "Marks the end of the series (inclusive)") final long end, + @UdfParameter(description = "Difference between each value in the series") final int step + ) { + checkStep(step); + final long diff = end - start; + if (diff > 0 && step < 0 || diff < 0 && step > 0) { + throw new KsqlFunctionException("GENERATE_SERIES step has wrong sign"); + } + final int size = 1 + (int) (diff / step); + final List result = new ArrayList<>(size); + int pos = 0; + long val = start; + while (pos++ < size) { + result.add(val); + val += step; + } + return result; + } + + private void checkStep(final int step) { + if (step == 0) { + throw new KsqlFunctionException("GENERATE_SERIES step cannot be zero"); + } + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/EntriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/EntriesTest.java new file mode 100644 index 000000000000..a02d417b8482 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/EntriesTest.java @@ -0,0 +1,160 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import static junit.framework.TestCase.assertNull; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +public class EntriesTest { + + private static final int ENTRIES = 20; + + private Entries entriesUdf = new Entries(); + + @Test + public void shouldComputeIntEntries() { + Map map = createMap(i -> i); + shouldComputeEntries(map, () -> entriesUdf.entriesInt(map, false)); + } + + @Test + public void shouldComputeBigIntEntries() { + Map map = createMap(Long::valueOf); + shouldComputeEntries(map, () -> entriesUdf.entriesBigInt(map, false)); + } + + @Test + public void shouldComputeDoubleEntries() { + Map map = createMap(Double::valueOf); + shouldComputeEntries(map, () -> entriesUdf.entriesDouble(map, false)); + } + + @Test + public void shouldComputeBooleanEntries() { + Map map = createMap(i -> i % 2 == 0); + shouldComputeEntries(map, () -> entriesUdf.entriesBoolean(map, false)); + } + + @Test + public void shouldComputeStringEntries() { + Map map = createMap(String::valueOf); + shouldComputeEntries(map, () -> entriesUdf.entriesString(map, false)); + } + + @Test + public void shouldComputeIntEntriesSorted() { + Map map = createMap(i -> i); + shouldComputeEntriesSorted(map, () -> entriesUdf.entriesInt(map, true)); + } + + @Test + public void shouldComputeBigIntEntriesSorted() { + Map map = createMap(Long::valueOf); + shouldComputeEntriesSorted(map, () -> entriesUdf.entriesBigInt(map, true)); + } + + @Test + public void shouldComputeDoubleEntriesSorted() { + Map map = createMap(Double::valueOf); + shouldComputeEntriesSorted(map, () -> entriesUdf.entriesDouble(map, true)); + } + + @Test + public void shouldComputeBooleanEntriesSorted() { + Map map = createMap(i -> i % 2 == 0); + shouldComputeEntriesSorted(map, () -> entriesUdf.entriesBoolean(map, true)); + } + + @Test + public void shouldComputeStringEntriesSorted() { + Map map = createMap(String::valueOf); + shouldComputeEntriesSorted(map, () -> entriesUdf.entriesString(map, true)); + } + + @Test + public void shouldReturnNullListForNullMapInt() { + assertNull(entriesUdf.entriesInt(null, false)); + } + + @Test + public void shouldReturnNullListForNullMapBigInt() { + assertNull(entriesUdf.entriesBigInt(null, false)); + } + + @Test + public void shouldReturnNullListForNullMapDouble() { + assertNull(entriesUdf.entriesDouble(null, false)); + } + + @Test + public void shouldReturnNullListForNullMapBoolean() { + assertNull(entriesUdf.entriesBoolean(null, false)); + } + + @Test + public void shouldReturnNullListForNullMapString() { + assertNull(entriesUdf.entriesString(null, false)); + } + + private void shouldComputeEntries( + Map map, Supplier> supplier + ) { + List out = supplier.get(); + assertThat(out, hasSize(map.size())); + for (int i = 0; i < out.size(); i++) { + Struct struct = out.get(i); + T val = map.get(struct.getString("K")); + assertThat(val == null, is(false)); + assertThat(val, is(struct.get("V"))); + } + } + + private void shouldComputeEntriesSorted(Map map, Supplier> supplier) { + List out = supplier.get(); + List> entries = new ArrayList<>(map.entrySet()); + entries.sort(Comparator.comparing(Entry::getKey)); + assertThat(out.size(), is(entries.size())); + for (int i = 0; i < entries.size(); i++) { + Struct struct = out.get(i); + Map.Entry entry = entries.get(i); + assertThat(struct.get("K"), is(entry.getKey())); + assertThat(struct.get("V"), is(entry.getValue())); + } + } + + private Map createMap(Function valueSupplier) { + Map map = new HashMap<>(); + for (int i = 0; i < ENTRIES; i++) { + map.put(UUID.randomUUID().toString(), valueSupplier.apply(i)); + } + return map; + } + +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/GenerateSeriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/GenerateSeriesTest.java new file mode 100644 index 000000000000..993043812a22 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/GenerateSeriesTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.function.KsqlFunctionException; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class GenerateSeriesTest { + + private GenerateSeries rangeUdf = new GenerateSeries(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldComputePositiveIntRange() { + List range = rangeUdf.generateSeriesInt(0, 9); + assertThat(range, hasSize(10)); + int val = 0; + for (Integer i : range) { + assertThat(val++, is(i)); + } + } + + @Test + public void shouldComputeNegativeIntRange() { + List range = rangeUdf.generateSeriesInt(9, 0); + assertThat(range, hasSize(10)); + int val = 9; + for (Integer i : range) { + assertThat(val--, is(i)); + } + } + + @Test + public void shouldComputeLongRange() { + List range = rangeUdf.generateSeriesLong(0, 9); + assertThat(range, hasSize(10)); + long val = 0; + for (Long i : range) { + assertThat(val++, is(i)); + } + } + + @Test + public void shouldComputeNegativeLongRange() { + List range = rangeUdf.generateSeriesLong(9, 0); + assertThat(range, hasSize(10)); + long val = 9; + for (Long i : range) { + assertThat(val--, is(i)); + } + } + + @Test + public void shouldComputeIntRangeWithPositiveEvenStepInt() { + List range = rangeUdf.generateSeriesInt(0, 9, 2); + assertThat(range, hasSize(5)); + int val = 0; + for (int i : range) { + assertThat(val, is(i)); + val += 2; + } + } + + @Test + public void shouldComputeIntRangeWithPositiveOddStepInt() { + List range = rangeUdf.generateSeriesInt(0, 9, 3); + assertThat(range, hasSize(4)); + int val = 0; + for (int i : range) { + assertThat(val, is(i)); + val += 3; + } + } + + @Test + public void shouldComputeIntRangeWithNegativeEvenStepInt() { + List range = rangeUdf.generateSeriesInt(9, 0, -2); + assertThat(range, hasSize(5)); + int val = 9; + for (int i : range) { + assertThat(val, is(i)); + val -= 2; + } + } + + @Test + public void shouldComputeIntRangeWithNegativeOddStepInt() { + List range = rangeUdf.generateSeriesInt(9, 0, -3); + assertThat(range, hasSize(4)); + int val = 9; + for (int i : range) { + assertThat(val, is(i)); + val -= 3; + } + } + + @Test + public void shouldComputeIntRangeWithEvenStepLong() { + List range = rangeUdf.generateSeriesLong(0, 9, 2); + assertThat(range, hasSize(5)); + long index = 0; + for (long i : range) { + assertThat(index, is(i)); + index += 2; + } + } + + @Test + public void shouldComputeIntRangeWithOddStepLong() { + List range = rangeUdf.generateSeriesLong(0, 9, 3); + assertThat(range, hasSize(4)); + long index = 0; + for (long i : range) { + assertThat(index, is(i)); + index += 3; + } + } + + @Test + public void shouldComputeIntRangeWithNegativeEvenStepLong() { + List range = rangeUdf.generateSeriesLong(9, 0, -2); + assertThat(range, hasSize(5)); + long val = 9; + for (long i : range) { + assertThat(val, is(i)); + val -= 2; + } + } + + @Test + public void shouldComputeIntRangeWithNegativeOddStepLong() { + List range = rangeUdf.generateSeriesLong(9, 0, -3); + assertThat(range, hasSize(4)); + long val = 9; + for (long i : range) { + assertThat(val, is(i)); + val -= 3; + } + } + + @Test + public void shouldThrowOnStepZeroInt() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step cannot be zero"); + rangeUdf.generateSeriesInt(0, 10, 0); + } + + @Test + public void shouldThrowOnStepZeroLong() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step cannot be zero"); + rangeUdf.generateSeriesLong(0L, 10L, 0); + } + + @Test + public void shouldThrowIfStepWrongSignInt1() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step has wrong sign"); + rangeUdf.generateSeriesInt(0, 10, -1); + } + + @Test + public void shouldThrowIfStepWrongSignInt2() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step has wrong sign"); + rangeUdf.generateSeriesInt(9, 0, 1); + } + + @Test + public void shouldThrowIfStepWrongSignLong1() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step has wrong sign"); + rangeUdf.generateSeriesLong(0, 10, -1); + } + + @Test + public void shouldThrowIfStepWrongSignLong2() { + expectedException.expect(KsqlFunctionException.class); + expectedException.expectMessage("GENERATE_SERIES step has wrong sign"); + rangeUdf.generateSeriesLong(9, 0, 1); + } + +} diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/array.json b/ksql-functional-tests/src/test/resources/query-validation-tests/array.json new file mode 100644 index 000000000000..87ca5f01d975 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/array.json @@ -0,0 +1,81 @@ +{ + "comments": [ + "Tests covering the use of the array returning UDFs." + ], + "tests": [ + { + "name": "entries sorted", + "statements": [ + "CREATE STREAM TEST (INTMAP MAP, BIGINTMAP MAP, DOUBLEMAP MAP, BOOLEANMAP MAP, STRINGMAP MAP, NULLMAP MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ENTRIES(INTMAP, TRUE), ENTRIES(BIGINTMAP, TRUE), ENTRIES(DOUBLEMAP, TRUE), ENTRIES(BOOLEANMAP, TRUE), ENTRIES(STRINGMAP, TRUE), ENTRIES(NULLMAP, TRUE) FROM TEST;" + ], + "inputs": [ + { + "topic": "test_topic", "key": 1, "value": { + "INTMAP": {"K1": 1, "K2": 2, "K3": 3}, + "BIGINTMAP": {"K1": 1, "K2": 2, "K3": 3}, + "DOUBLEMAP": {"K1": 1.0, "K2": 2.0, "K3": 3.0}, + "BOOLEANMAP": {"K1": true, "K2": false, "K3": true}, + "STRINGMAP": {"K1": "V1", "K2": "V2", "K3": "V3"}, + "NULLMAP": null + } + } + ], + "outputs": [ + { + "topic": "OUTPUT", "key": 1, + "value": { + "KSQL_COL_0": [{"K": "K1", "V": 1}, {"K": "K2", "V": 2}, {"K": "K3", "V": 3}], + "KSQL_COL_1": [{"K": "K1", "V": 1}, {"K": "K2", "V": 2}, {"K": "K3", "V": 3}], + "KSQL_COL_2": [{"K": "K1", "V": 1.0}, {"K": "K2", "V": 2.0}, {"K": "K3", "V": 3.0}], + "KSQL_COL_3": [{"K": "K1", "V": true}, {"K": "K2", "V": false}, {"K": "K3", "V": true}], + "KSQL_COL_4": [{"K": "K1", "V": "V1"}, {"K": "K2", "V": "V2"}, {"K": "K3", "V": "V3"}], + "KSQL_COL_5": null + } + } + ] + }, + { + "name": "GENERATE_SERIES", + "statements": [ + "CREATE STREAM TEST (F0 INT, F1 INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT GENERATE_SERIES(F0, F1) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"F0": 0, "F1": 3}}, + {"topic": "test_topic", "key": 1, "value": {"F0": -2, "F1": 1}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 4, "F1": 3}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 4, "F1": 0}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [0, 1, 2, 3]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [-2, -1, 0, 1]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": []}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": []}} + ] + }, + { + "name": "GENERATE_SERIES with step", + "statements": [ + "CREATE STREAM TEST (F0 INT, F1 INT, F2 INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT GENERATE_SERIES(F0, F1, F2) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"F0": 0, "F1": 3, "F2": 1}}, + {"topic": "test_topic", "key": 1, "value": {"F0": -2, "F1": 1, "F2": 2}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 0, "F1": 9, "F2": 3}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 3, "F1": 0, "F2": -1}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 1, "F1": -2, "F2": -2}}, + {"topic": "test_topic", "key": 1, "value": {"F0": 9, "F1": 0, "F2": -3}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [0, 1, 2, 3]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [-2, 0]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [0, 3, 6, 9]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [3, 2, 1, 0]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [1, -1]}}, + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [9, 6, 3, 0]}} + ] + } + ] +} \ No newline at end of file