Skip to content

Commit

Permalink
Enhance join and group-by tests to cover the values stored in interna…
Browse files Browse the repository at this point in the history
…l topics. (#2598)

* Enhance join and group-by tests to cover the values stored in internal topics.
  • Loading branch information
big-andy-coates authored Mar 26, 2019
1 parent 8cfbfe1 commit a9efa39
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,11 @@ private static class ValueSpec {
this.spec = spec;
}

@SuppressWarnings("ConstantConditions")
private static void compare(final Object o1, final Object o2, final String path) {
if (o1 == null && o2 == null) {
return;
}
if (o1 == null || o2 == null) {
throw new AssertionError("Unexpected null at path " + path);
}
if (o1 instanceof Map) {
assertThat("type mismatch at " + path, o2, instanceOf(Map.class));
assertThat("keyset mismatch at " + path, ((Map) o1).keySet(), equalTo(((Map)o2).keySet()));
Expand All @@ -139,8 +137,8 @@ private static void compare(final Object o1, final Object o2, final String path)
compare(((List) o1).get(i), ((List) o2).get(i), path + "." + i);
}
} else {
assertThat("mismatch at path " + path, o1, equalTo(o2));
assertThat("type mismatch at " + path, o1.getClass(), equalTo(o2.getClass()));
assertThat("mismatch at path" + path, o1, equalTo(o2));
}
}

Expand All @@ -156,6 +154,11 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(spec);
}

@Override
public String toString() {
return Objects.toString(spec);
}
}

protected interface SerdeSupplier<T> {
Expand Down Expand Up @@ -605,11 +608,12 @@ void verifyOutput(final TopologyTestDriver testDriver,
}
} catch (final AssertionError assertionError) {
final String rowMsg = idx == -1 ? "" : " while processing output row " + idx;
final String topicMsg = idx == -1 ? "" : " topic: " + outputRecords.get(idx).topic.name;
throw new AssertionError("TestCase name: "
+ name
+ " in file: " + testPath
+ " failed" + rowMsg + " due to: "
+ assertionError.getMessage());
+ " failed" + rowMsg + topicMsg + " due to: "
+ assertionError.getMessage(), assertionError);
}
}

Expand Down
143 changes: 143 additions & 0 deletions ksql-engine/src/test/resources/query-validation-tests/group-by.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,43 @@
{"topic": "test_topic", "key": "d1", "value": "d1"}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,3"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,1"},
{"topic": "OUTPUT", "key": "d2", "value": "d2,1"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,2"},
{"topic": "OUTPUT", "key": "d2", "value": "d2,2"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,3"}
]
},{
"name": "field (stream->table) - format",
"statements": [
"CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', KEY='data', value_format='{FORMAT}');",
"CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;"
],
"format": ["AVRO", "JSON"],
"inputs": [
{"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}},
{"topic": "test_topic", "key": "d2", "value": {"DATA": "d2"}},
{"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}},
{"topic": "test_topic", "key": "d2", "value": {"DATA": "d2"}},
{"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 3}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":1}},
{"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":1}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":2}},
{"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":2}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":3}}
]
},
{
"name": "fields (stream->table)",
Expand All @@ -38,13 +69,45 @@
{"topic": "test_topic", "key": 3, "value": "3,a"}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|3", "value": "3,a,0,1"},
{"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"},
{"topic": "OUTPUT", "key": "b|+|2", "value": "2,b,1"},
{"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,2"},
{"topic": "OUTPUT", "key": "b|+|2", "value": "2,b,2"},
{"topic": "OUTPUT", "key": "a|+|3", "value": "3,a,1"}
]
},
{
"name": "fields (stream->table) - format",
"statements": [
"CREATE STREAM TEST (f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');",
"CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;"
],
"format": ["AVRO", "JSON"],
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}},
{"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}},
{"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}},
{"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}},
{"topic": "test_topic", "key": 3, "value": {"F1": 3, "F2": "a"}}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|3", "value": {"KSQL_INTERNAL_COL_0": 3, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}},
{"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 1}},
{"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 2}},
{"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 2}},
{"topic": "OUTPUT", "key": "a|+|3", "value": {"F1": 3, "F2": "a", "KSQL_COL_2": 1}}
]
},
{
"name": "with groupings (stream->table)",
"statements": [
Expand Down Expand Up @@ -80,6 +143,13 @@
{"topic": "test_topic", "key": 1, "value": "1,a"}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": "1,b,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": "2,b,0,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": "1,b,0,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": "1,a,0,1"},
{"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"},
{"topic": "OUTPUT", "key": "b|+|2", "value": "2,b,1"},
{"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,0"},
Expand All @@ -89,6 +159,37 @@
{"topic": "OUTPUT", "key": "a|+|1", "value": "1,a,1"}
]
},
{
"name": "fields (table->table) - format",
"statements": [
"CREATE TABLE TEST (f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');",
"CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;"
],
"format": ["AVRO", "JSON"],
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}},
{"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}},
{"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "b"}},
{"topic": "test_topic", "key": 2, "value": null},
{"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|2", "value": {"KSQL_INTERNAL_COL_0": 2, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "b|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "b", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "a|+|1", "value": {"KSQL_INTERNAL_COL_0": 1, "KSQL_INTERNAL_COL_1": "a", "KSQL_INTERNAL_COL_2": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}},
{"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 1}},
{"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 0}},
{"topic": "OUTPUT", "key": "b|+|1", "value": {"F1": 1, "F2": "b", "KSQL_COL_2": 1}},
{"topic": "OUTPUT", "key": "b|+|2", "value": {"F1": 2, "F2": "b", "KSQL_COL_2": 0}},
{"topic": "OUTPUT", "key": "b|+|1", "value": {"F1": 1, "F2": "b", "KSQL_COL_2": 0}},
{"topic": "OUTPUT", "key": "a|+|1", "value": {"F1": 1, "F2": "a", "KSQL_COL_2": 1}}
]
},
{
"name": "field with re-key (stream->table)",
"statements": [
Expand All @@ -103,13 +204,55 @@
{"topic": "test_topic", "value": "d1"}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": "d2,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": "d2,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": "d1,0"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,1"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": "d2,0,2"},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": "d1,0,3"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,1"},
{"topic": "OUTPUT", "key": "d2", "value": "d2,1"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,2"},
{"topic": "OUTPUT", "key": "d2", "value": "d2,2"},
{"topic": "OUTPUT", "key": "d1", "value": "d1,3"}
]
},
{
"name": "field with re-key (stream->table) - format",
"statements": [
"CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', value_format='{FORMAT}');",
"CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;"
],
"format": ["AVRO", "JSON"],
"inputs": [
{"topic": "test_topic", "value": {"DATA": "d1"}},
{"topic": "test_topic", "value": {"DATA": "d2"}},
{"topic": "test_topic", "value": {"DATA": "d1"}},
{"topic": "test_topic", "value": {"DATA": "d2"}},
{"topic": "test_topic", "value": {"DATA": "d1"}}
],
"outputs": [
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-groupby-repartition", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 1}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d2", "value": {"KSQL_INTERNAL_COL_0": "d2", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 2}},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-aggregate-changelog", "key": "d1", "value": {"KSQL_INTERNAL_COL_0": "d1", "KSQL_INTERNAL_COL_1": 0, "KSQL_AGG_VARIABLE_0": 3}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":1}},
{"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":1}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":2}},
{"topic": "OUTPUT", "key": "d2", "value": {"DATA": "d2", "KSQL_COL_1":2}},
{"topic": "OUTPUT", "key": "d1", "value": {"DATA": "d1", "KSQL_COL_1":3}}
]
},
{
"name": "field with re-key (table->table)",
"statements": [
Expand Down
Loading

0 comments on commit a9efa39

Please sign in to comment.