diff --git a/ksql-engine/src/test/java/io/confluent/ksql/EndToEndEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/EndToEndEngineTestUtil.java index 7e8597da438b..180323fa8e1c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/EndToEndEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/EndToEndEngineTestUtil.java @@ -119,13 +119,11 @@ private static class ValueSpec { this.spec = spec; } + @SuppressWarnings("ConstantConditions") private static void compare(final Object o1, final Object o2, final String path) { if (o1 == null && o2 == null) { return; } - if (o1 == null || o2 == null) { - throw new AssertionError("Unexpected null at path " + path); - } if (o1 instanceof Map) { assertThat("type mismatch at " + path, o2, instanceOf(Map.class)); assertThat("keyset mismatch at " + path, ((Map) o1).keySet(), equalTo(((Map)o2).keySet())); @@ -139,8 +137,8 @@ private static void compare(final Object o1, final Object o2, final String path) compare(((List) o1).get(i), ((List) o2).get(i), path + "." + i); } } else { + assertThat("mismatch at path " + path, o1, equalTo(o2)); assertThat("type mismatch at " + path, o1.getClass(), equalTo(o2.getClass())); - assertThat("mismatch at path" + path, o1, equalTo(o2)); } } @@ -156,6 +154,11 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(spec); } + + @Override + public String toString() { + return Objects.toString(spec); + } } protected interface SerdeSupplier { @@ -605,11 +608,12 @@ void verifyOutput(final TopologyTestDriver testDriver, } } catch (final AssertionError assertionError) { final String rowMsg = idx == -1 ? "" : " while processing output row " + idx; + final String topicMsg = idx == -1 ? "" : " topic: " + outputRecords.get(idx).topic.name; throw new AssertionError("TestCase name: " + name + " in file: " + testPath - + " failed" + rowMsg + " due to: " - + assertionError.getMessage()); + + " failed" + rowMsg + topicMsg + " due to: " + + assertionError.getMessage(), assertionError); } } diff --git a/ksql-engine/src/test/resources/query-validation-tests/group-by.json b/ksql-engine/src/test/resources/query-validation-tests/group-by.json index 32d470c233f1..f48ec9a49047 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/group-by.json +++ b/ksql-engine/src/test/resources/query-validation-tests/group-by.json @@ -17,12 +17,43 @@ {"topic": "test_topic", "key": "d1", "value": "d1"} ], "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,3"}, {"topic": "OUTPUT", "key": "d1", "value": "d1,1"}, {"topic": "OUTPUT", "key": "d2", "value": "d2,1"}, {"topic": "OUTPUT", "key": "d1", "value": "d1,2"}, {"topic": "OUTPUT", "key": "d2", "value": "d2,2"}, {"topic": "OUTPUT", "key": "d1", "value": "d1,3"} ] + },{ + "name": "field (stream->table) - format", + "statements": [ + "CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', KEY='data', value_format='{FORMAT}');", + "CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;" + ], + "format": ["AVRO", "JSON"], + "inputs": [ + {"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}}, + {"topic": "test_topic", "key": "d2", "value": {"DATA": "d2"}}, + {"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}}, + {"topic": "test_topic", "key": "d2", "value": {"DATA": "d2"}}, + {"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 3}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":1}}, + {"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":1}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":2}}, + {"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":2}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":3}} + ] }, { "name": "fields (stream->table)", @@ -38,6 +69,11 @@ {"topic": "test_topic", "key": 3, "value": "3,a"} ], "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|3", "value": "3,a,0,1"}, {"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"}, {"topic": "OUTPUT", "key": "b|+|2", "value": "2,b,1"}, {"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,2"}, @@ -45,6 +81,33 @@ {"topic": "OUTPUT", "key": "a|+|3", "value": "3,a,1"} ] }, + { + "name": "fields (stream->table) - format", + "statements": [ + "CREATE STREAM TEST (f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');", + "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;" + ], + "format": ["AVRO", "JSON"], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}, + {"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}}, + {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}, + {"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}}, + {"topic": "test_topic", "key": 3, "value": {"F1": 3, "F2": "a"}} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|3", "value": {"KSQL_INTERNAL_COL_0": 3, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}}, + {"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 1}}, + {"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 2}}, + {"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 2}}, + {"topic": "OUTPUT", "key": "a|+|3", "value": {"F1": 3, "F2": "a", "KSQL_COL_2": 1}} + ] + }, { "name": "with groupings (stream->table)", "statements": [ @@ -80,6 +143,13 @@ {"topic": "test_topic", "key": 1, "value": "1,a"} ], "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": "1,b,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": "1,b,0,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"}, {"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"}, {"topic": "OUTPUT", "key": "b|+|2", "value": "2,b,1"}, {"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,0"}, @@ -89,6 +159,37 @@ {"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"} ] }, + { + "name": "fields (table->table) - format", + "statements": [ + "CREATE TABLE TEST (f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');", + "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;" + ], + "format": ["AVRO", "JSON"], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}, + {"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}}, + {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "b"}}, + {"topic": "test_topic", "key": 2, "value": null}, + {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}}, + {"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 1}}, + {"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 0}}, + {"topic": "OUTPUT", "key": "b|+|1", "value": {"F1": 1, "F2": "b", "KSQL_COL_2": 1}}, + {"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 0}}, + {"topic": "OUTPUT", "key": "b|+|1", "value": {"F1": 1, "F2": "b", "KSQL_COL_2": 0}}, + {"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}} + ] + }, { "name": "field with re-key (stream->table)", "statements": [ @@ -103,6 +204,16 @@ {"topic": "test_topic", "value": "d1"} ], "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": "d2,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": "d2,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,1"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,2"}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,3"}, {"topic": "OUTPUT", "key": "d1", "value": "d1,1"}, {"topic": "OUTPUT", "key": "d2", "value": "d2,1"}, {"topic": "OUTPUT", "key": "d1", "value": "d1,2"}, @@ -110,6 +221,38 @@ {"topic": "OUTPUT", "key": "d1", "value": "d1,3"} ] }, + { + "name": "field with re-key (stream->table) - format", + "statements": [ + "CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", + "CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;" + ], + "format": ["AVRO", "JSON"], + "inputs": [ + {"topic": "test_topic", "value": {"DATA": "d1"}}, + {"topic": "test_topic", "value": {"DATA": "d2"}}, + {"topic": "test_topic", "value": {"DATA": "d1"}}, + {"topic": "test_topic", "value": {"DATA": "d2"}}, + {"topic": "test_topic", "value": {"DATA": "d1"}} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 3}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":1}}, + {"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":1}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":2}}, + {"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":2}}, + {"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":3}} + ] + }, { "name": "field with re-key (table->table)", "statements": [ diff --git a/ksql-engine/src/test/resources/query-validation-tests/joins.json b/ksql-engine/src/test/resources/query-validation-tests/joins.json index 8bb2c768ddd2..e2171bd05d52 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/joins.json +++ b/ksql-engine/src/test/resources/query-validation-tests/joins.json @@ -25,9 +25,16 @@ {"topic": "right_topic", "key": 100, "value": {"ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, {"topic": "left_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000} - ], "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001", "value": {"ROWTIME": 0, "ROWKEY": "0", "ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000'\u0010\u0000\u0000\u0000\u0001", "value": {"ROWTIME": 10000, "ROWKEY": "0", "ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "key": "10\u0000\u0000\u0000\u0000\u0000\u0000*�\u0000\u0000\u0000\u0002", "value": {"ROWTIME": 11000, "ROWKEY": "10", "ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u00002�\u0000\u0000\u0000\u0003", "value": {"ROWTIME": 13000, "ROWKEY": "0", "ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000:�\u0000\u0000\u0000\u0002", "value": {"ROWTIME": 15000, "ROWKEY": "0", "ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "key": "100\u0000\u0000\u0000\u0000\u0000\u0000>�\u0000\u0000\u0000\u0003", "value": {"ROWTIME": 16000, "ROWKEY": "100", "ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "key": "90\u0000\u0000\u0000\u0000\u0000\u0000Bh\u0000\u0000\u0000\u0004", "value": {"ROWTIME": 17000, "ROWKEY": "90", "ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000u0\u0000\u0000\u0000\u0005", "value": {"ROWTIME": 30000, "ROWKEY": "0", "ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000}, {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": null, "F2": null}, "timestamp": 0}, {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, {"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": null, "F2": null}, "timestamp": 11000}, @@ -37,7 +44,51 @@ {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": null, "F2": null}, "timestamp": 30000} ] }, - { + { + "name": "stream stream left join - rekey", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", + "CREATE STREAM TEST_STREAM (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", + "CREATE STREAM LEFT_OUTER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t left join TEST_STREAM tt WITHIN 11 seconds ON t.id = tt.id;" + ], + "inputs": [ + {"topic": "left_topic", "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "right_topic", "value": {"ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "left_topic", "value": {"ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "left_topic", "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "right_topic", "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "right_topic", "value": {"ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, + {"topic": "left_topic", "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "left_topic", "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": "0", "value": {"ROWTIME": 0, "ROWKEY": "0", "ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": "0", "value": {"ROWTIME": 10000, "ROWKEY": "0", "ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": "10", "value": {"ROWTIME": 11000, "ROWKEY": "10", "ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": "0", "value": {"ROWTIME": 13000, "ROWKEY": "0", "ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": "0", "value": {"ROWTIME": 15000, "ROWKEY": "0", "ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": "100", "value": {"ROWTIME": 16000, "ROWKEY": "100", "ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": "90", "value": {"ROWTIME": 17000, "ROWKEY": "90", "ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": "0", "value": {"ROWTIME": 30000, "ROWKEY": "0", "ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000020-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001", "value": {"ROWTIME": 0, "ROWKEY": "0", "ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000021-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000'\u0010\u0000\u0000\u0000\u0001", "value": {"ROWTIME": 10000, "ROWKEY": "0", "ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000020-store-changelog", "key": "10\u0000\u0000\u0000\u0000\u0000\u0000*�\u0000\u0000\u0000\u0002", "value": {"ROWTIME": 11000, "ROWKEY": "10", "ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000020-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u00002�\u0000\u0000\u0000\u0003", "value": {"ROWTIME": 13000, "ROWKEY": "0", "ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000021-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000:�\u0000\u0000\u0000\u0002", "value": {"ROWTIME": 15000, "ROWKEY": "0", "ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000021-store-changelog", "key": "100\u0000\u0000\u0000\u0000\u0000\u0000>�\u0000\u0000\u0000\u0003", "value": {"ROWTIME": 16000, "ROWKEY": "100", "ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000020-store-changelog", "key": "90\u0000\u0000\u0000\u0000\u0000\u0000Bh\u0000\u0000\u0000\u0004", "value": {"ROWTIME": 17000, "ROWKEY": "90", "ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000020-store-changelog", "key": "0\u0000\u0000\u0000\u0000\u0000\u0000u0\u0000\u0000\u0000\u0005", "value": {"ROWTIME": 30000, "ROWKEY": "0", "ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": null, "F2": null}, "timestamp": 0}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": null, "F2": null}, "timestamp": 11000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "blah", "F2": 50}, "timestamp": 13000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "LEFT_OUTER_JOIN", "key": 90, "value": {"T_ID": 90, "NAME": "ninety", "VALUE": 90, "F1": null, "F2": null}, "timestamp": 17000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": null, "F2": null}, "timestamp": 30000} + ] + }, + { "name": "stream stream inner join", "format": ["AVRO", "JSON"], "statements": [