From 09845299ab7bcba893ee89374587dbe9763f2801 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 24 Jul 2019 18:18:25 +0100 Subject: [PATCH] fix: logicalSchema toString() to include key fields (MINOR) (#3123) --- .../ksql/schema/ksql/LogicalSchema.java | 10 ++-- .../ksql/schema/ksql/LogicalSchemaTest.java | 45 +++++++---------- .../ksql/engine/InsertValuesExecutorTest.java | 3 -- .../physical/PhysicalPlanBuilderTest.java | 50 +++++++++---------- .../ksql/structured/SchemaKStreamTest.java | 22 ++++---- 5 files changed, 56 insertions(+), 74 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index 333628c5d2da..d3652e54d86a 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -319,16 +319,12 @@ public String toString() { return toString(FormatOptions.none()); } - @SuppressWarnings("ConstantConditions") public String toString(final FormatOptions formatOptions) { // Meta fields deliberately excluded. - // Key fields excluded for now: - // final String keys = keyFields.stream() - // .map(f -> f.toString(formatOptions) + " " + KEY_KEYWORD) - // .collect(Collectors.joining(", ")); - - final String keys = ""; + final String keys = keyFields.stream() + .map(f -> f.toString(formatOptions) + " " + KEY_KEYWORD) + .collect(Collectors.joining(", ")); final String values = valueFields.stream() .map(f -> f.toString(formatOptions)) diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index 1fe6a023eca1..d279832269ca 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -537,33 +537,20 @@ public void shouldExposeAliasedAllFields() { @Test public void shouldConvertSchemaToString() { // Given: - final LogicalSchema schema = LogicalSchema.of( - SchemaBuilder.struct() - .field("f0", SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA) - .field("f1", SchemaBuilder.OPTIONAL_INT32_SCHEMA) - .field("f2", SchemaBuilder.OPTIONAL_INT64_SCHEMA) - .field("f4", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA) - .field("f5", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f6", SchemaBuilder - .struct() - .field("a", Schema.OPTIONAL_INT64_SCHEMA) - .optional() - .build()) - .field("f7", SchemaBuilder - .array( - SchemaBuilder.OPTIONAL_STRING_SCHEMA - ) - .optional() - .build()) - .field("f8", SchemaBuilder - .map( - SchemaBuilder.OPTIONAL_STRING_SCHEMA, - SchemaBuilder.OPTIONAL_STRING_SCHEMA - ) - .optional() - .build()) - .build() - ); + final LogicalSchema schema = LogicalSchema.builder() + .valueField("f0", SqlTypes.BOOLEAN) + .valueField("f1", SqlTypes.INTEGER) + .valueField("f2", SqlTypes.BIGINT) + .valueField("f4", SqlTypes.DOUBLE) + .valueField("f5", SqlTypes.STRING) + .valueField("f6", SqlTypes.struct() + .field("a", SqlTypes.BIGINT) + .build()) + .valueField("f7", SqlTypes.array(SqlTypes.STRING)) + .valueField("f8", SqlTypes.map(SqlTypes.STRING)) + .keyField("k0", SqlTypes.BIGINT) + .keyField("k1", SqlTypes.DOUBLE) + .build(); // When: final String s = schema.toString(); @@ -571,6 +558,8 @@ public void shouldConvertSchemaToString() { // Then: assertThat(s, is( "[" + + "`k0` BIGINT KEY, " + + "`k1` DOUBLE KEY, " + "`f0` BOOLEAN, " + "`f1` INTEGER, " + "`f2` BIGINT, " @@ -606,6 +595,7 @@ public void shouldConvertSchemaToStringWithReservedWords() { // Then: assertThat(s, is( "[" + + "ROWKEY STRING KEY, " + "`f0` BOOLEAN, " + "f1 STRUCT<`f0` BIGINT, f1 BIGINT>" + "]")); @@ -626,6 +616,7 @@ public void shouldConvertAliasedSchemaToString() { // Then: assertThat(s, is( "[" + + "`t.ROWKEY` STRING KEY, " + "`t.f0` BOOLEAN" + "]")); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 857b5350c0c4..dd2b15155422 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -114,8 +114,6 @@ public class InsertValuesExecutorTest { @Mock private Serializer keySerializer; @Mock - private Deserializer keyDeserializer; - @Mock private Serde valueSerDe; @Mock private Serializer valueSerializer; @@ -134,7 +132,6 @@ public void setup() { when(valueSerde.createSerde(any(), any(), any())).thenReturn(valueSerDe); when(keySerDe.serializer()).thenReturn(keySerializer); - when(keySerDe.deserializer()).thenReturn(keyDeserializer); when(valueSerDe.serializer()).thenReturn(valueSerializer); when(valueSerDe.deserializer()).thenReturn(valueDeserializer); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index b74894ac1027..a19ecedbe025 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -325,21 +325,21 @@ public void shouldCreateExecutionPlan() { final String planText = metadata.getExecutionPlan(); final String[] lines = planText.split("\n"); assertThat(lines[0], startsWith( - " > [ SINK ] | Schema: [COL0 BIGINT, KSQL_COL_1 DOUBLE, KSQL_COL_2 BIGINT] |")); + " > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, KSQL_COL_1 DOUBLE, KSQL_COL_2 BIGINT] |")); assertThat(lines[1], startsWith( - "\t\t > [ AGGREGATE ] | Schema: [KSQL_INTERNAL_COL_0 BIGINT, " + "\t\t > [ AGGREGATE ] | Schema: [ROWKEY STRING KEY, KSQL_INTERNAL_COL_0 BIGINT, " + "KSQL_INTERNAL_COL_1 DOUBLE, KSQL_AGG_VARIABLE_0 DOUBLE, " + "KSQL_AGG_VARIABLE_1 BIGINT] |")); assertThat(lines[2], startsWith( - "\t\t\t\t > [ PROJECT ] | Schema: [KSQL_INTERNAL_COL_0 BIGINT, " + "\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, KSQL_INTERNAL_COL_0 BIGINT, " + "KSQL_INTERNAL_COL_1 DOUBLE] |")); assertThat(lines[3], startsWith( - "\t\t\t\t\t\t > [ FILTER ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, " + "\t\t\t\t\t\t > [ FILTER ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, " + "TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 STRING, " + "TEST1.COL3 DOUBLE, TEST1.COL4 ARRAY, " + "TEST1.COL5 MAP] |")); assertThat(lines[4], startsWith( - "\t\t\t\t\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, " + "\t\t\t\t\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, " + "TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 STRING, " + "TEST1.COL3 DOUBLE, TEST1.COL4 ARRAY, " + "TEST1.COL5 MAP] |")); @@ -360,11 +360,11 @@ public void shouldCreateExecutionPlanForInsert() { final String[] lines = planText.split("\n"); Assert.assertTrue(lines.length == 3); Assert.assertEquals(lines[0], - " > [ SINK ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.S1"); + " > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.S1"); Assert.assertEquals(lines[1], - "\t\t > [ PROJECT ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.Project"); + "\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.Project"); Assert.assertEquals(lines[2], - "\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic"); + "\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic"); assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class)); final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) queryMetadataList.get(1); @@ -398,9 +398,9 @@ public void shouldFailInsertIfTheResultSchemaDoesNotMatch() { expectedException.expect(KsqlStatementException.class); expectedException.expect(rawMessage(is( "Incompatible schema between results and sink. Result schema is " - + "[`COL0` BIGINT, `COL1` STRING, `COL2` DOUBLE], " + + "[`ROWKEY` STRING KEY, `COL0` BIGINT, `COL1` STRING, `COL2` DOUBLE], " + "but the sink schema is " - + "[`COL0` BIGINT, `COL1` STRING]."))); + + "[`ROWKEY` STRING KEY, `COL0` BIGINT, `COL1` STRING]."))); // When: execute(CREATE_STREAM_TEST1 + csasQuery + insertIntoQuery); @@ -444,13 +444,13 @@ public void shouldCreatePlanForInsertIntoStreamFromStream() { final String[] lines = planText.split("\n"); assertThat(lines.length, equalTo(3)); assertThat(lines[0], containsString( - "> [ SINK ] | Schema: [ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]")); + "> [ SINK ] | Schema: [ROWKEY STRING KEY, ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]")); assertThat(lines[1], containsString( - "> [ PROJECT ] | Schema: [ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]")); + "> [ PROJECT ] | Schema: [ROWKEY STRING KEY, ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]")); assertThat(lines[2], containsString( - "> [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 INTEGER]")); + "> [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 INTEGER]")); } @Test @@ -489,11 +489,11 @@ public void shouldCheckSinkAndResultKeysDoNotMatch() { final String[] lines = planText.split("\n"); assertThat(lines.length, equalTo(4)); assertThat(lines[0], - equalTo(" > [ REKEY ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] " + equalTo(" > [ REKEY ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] " + "| Logger: InsertQuery_1.S1")); - assertThat(lines[1], equalTo("\t\t > [ SINK ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 " + assertThat(lines[1], equalTo("\t\t > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 " + "DOUBLE] | Logger: InsertQuery_1.S1")); - assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [COL0 BIGINT, COL1 STRING" + assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING" + ", COL2 DOUBLE] | Logger: InsertQuery_1.Project")); } @@ -760,8 +760,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey() { .get(0); // Then: - assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT")); + assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST2.")); } @Test @@ -777,8 +776,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey() { .get(0); // Then: - assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT")); + assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST3.")); } @Test @@ -908,7 +906,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey_Legacy() { // Then: assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST2.")); } @Test @@ -926,7 +924,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey_Legacy() { // Then: assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST3.")); } @Test @@ -982,9 +980,9 @@ public void shouldRepartitionBothStreamsIfJoiningOnRowKey_Legacy() { // Then: assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST2.")); assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST3.")); } @Test @@ -1002,9 +1000,9 @@ public void shouldRepartitionBothStreamsIfJoiningOnRowKeyWhenStreamsHaveNoKeyFie // Then: assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST7.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST7.")); assertThat(result.getExecutionPlan(), - containsString("[ REKEY ] | Schema: [TEST7.ROWTIME BIGINT")); + containsString("[ REKEY ] | Schema: [TEST7.")); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index d88fe2a23a89..fb93b15996d2 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -766,10 +766,10 @@ public void shouldSummarizeExecutionPlanCorrectly() { queryContext.push("source").getQueryContext()); // When/Then: - final String expected = - " > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n\t" - + "parent plan"; - assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected)); + assertThat(schemaKtream.getExecutionPlan(""), equalTo( + " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | " + + "Logger: query.node.source\n" + + "\tparent plan")); } @Test @@ -787,9 +787,9 @@ public void shouldSummarizeExecutionPlanCorrectlyForRoot() { queryContext.push("source").getQueryContext()); // When/Then: - final String expected = - " > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n"; - assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected)); + assertThat(schemaKtream.getExecutionPlan(""), equalTo( + " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | " + + "Logger: query.node.source\n")); } @Test @@ -813,11 +813,11 @@ public void shouldSummarizeExecutionPlanCorrectlyWhenMultipleParents() { queryContext.push("source").getQueryContext()); // When/Then: - final String expected = - " > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n" + assertThat(schemaKtream.getExecutionPlan(""), equalTo( + " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | " + + "Logger: query.node.source\n" + "\tparent 1 plan" - + "\tparent 2 plan"; - assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected)); + + "\tparent 2 plan")); } private void whenCreateJoined() {