From 2c738c50ecfc5d6feecc5567b81f9aa46d447c38 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Thu, 16 Jan 2020 10:59:14 +0000 Subject: [PATCH 1/5] added test --- .../resources/query-validation-tests/foo.json | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/foo.json diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json b/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json new file mode 100644 index 000000000000..27450ac4ee8b --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json @@ -0,0 +1,20 @@ +{ + "tests": [ + { + "name": "should not blow up on null key", + "statements": [ + "CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT as SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": null, "value": "2"}, + {"topic": "test_topic", "key": "1", "value": "1"}, + {"topic": "test_topic", "key": "1", "value": "3"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": "1"}, + {"topic": "OUTPUT", "key": "1", "value": "3"} + ] + } + ] +} From 82f23495bae88897c19d38de03d4c39343988e5c Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Sun, 19 Jan 2020 16:46:28 +0000 Subject: [PATCH 2/5] fix: Make sure null key is preserved during serialization/deserialization --- .../java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java index feae69a9e17c..c60f1ee45104 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java @@ -116,7 +116,7 @@ private static final class RowSerializer implements Serializer { @Override public byte[] serialize(final String topic, final Object struct) { - final Object value = ((Struct) struct).get(field); + final Object value = struct == null ? null : ((Struct) struct).get(field); return delegate.serialize(topic, value); } } @@ -142,7 +142,7 @@ public Struct deserialize(final String topic, final byte[] bytes) { final Object primitive = delegate.deserialize(topic, bytes); final Struct struct = new Struct(schema); struct.put(field, primitive); - return struct; + return primitive == null ? null : struct; } catch (final Exception e) { throw new SerializationException( "Error deserializing DELIMITED message from topic: " + topic, e); From cf9a3a516ab6613c4f7e8538bc699f26f033c4fe Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Sun, 19 Jan 2020 17:12:29 +0000 Subject: [PATCH 3/5] fixed merge conflict --- .../resources/query-validation-tests/foo.json | 20 ------------------- .../query-validation-tests/table.json | 16 +++++++++++++++ 2 files changed, 16 insertions(+), 20 deletions(-) delete mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/foo.json diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json b/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json deleted file mode 100644 index 27450ac4ee8b..000000000000 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/foo.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "tests": [ - { - "name": "should not blow up on null key", - "statements": [ - "CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');", - "CREATE TABLE OUTPUT as SELECT * FROM INPUT;" - ], - "inputs": [ - {"topic": "test_topic", "key": null, "value": "2"}, - {"topic": "test_topic", "key": "1", "value": "1"}, - {"topic": "test_topic", "key": "1", "value": "3"} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "1", "value": "1"}, - {"topic": "OUTPUT", "key": "1", "value": "3"} - ] - } - ] -} diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/table.json b/ksql-functional-tests/src/test/resources/query-validation-tests/table.json index b10e34d09aac..b971f5e3b8e6 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/table.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/table.json @@ -74,6 +74,22 @@ {"topic": "OUTPUT", "key": "1", "value": null}, {"topic": "OUTPUT", "key": "1", "value": "2"} ] + }, + { + "name": "should not blow up on null key", + "statements": [ + "CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT as SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": "1"}, + {"topic": "test_topic", "key": null, "value": "2"}, + {"topic": "test_topic", "key": "1", "value": "3"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": "1"}, + {"topic": "OUTPUT", "key": "1", "value": "3"} + ] } ] } From a8c591b2386a8d9aa7569140f7f690b71c42b8bd Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Sun, 19 Jan 2020 19:56:44 +0000 Subject: [PATCH 4/5] updated test --- .../confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java index d6839db609e8..9ab1beead718 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.name.ColumnName; @@ -148,6 +149,7 @@ public void shouldThrowOnValidateIfStruct() { @Test public void shouldHandleNulls() { shouldHandle(SqlTypes.INTEGER, null); + shouldHandle(SqlTypes.STRING, null); } @Test @@ -192,7 +194,11 @@ private void shouldHandle(final SqlType fieldSchema, final Object value) { final Object result = serde.deserializer().deserialize("topic", bytes); // Then: - assertThat(result, is(struct)); + if (value == null) { + assertThat(result, is(nullValue())); + } else { + assertThat(result, is(struct)); + } } private static PersistenceSchema schemaWithFieldOfType(final SqlType fieldSchema) { From aff74b09867d0fedfc4bb1af4dd68729844113ef Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Mon, 20 Jan 2020 18:50:02 +0000 Subject: [PATCH 5/5] Review updates --- .../ksql/serde/kafka/KafkaSerdeFactory.java | 5 ++- .../serde/kafka/KafkaSerdeFactoryTest.java | 34 ++++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java index c60f1ee45104..153a584678c8 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java @@ -140,9 +140,12 @@ private static final class RowDeserializer implements Deserializer { public Struct deserialize(final String topic, final byte[] bytes) { try { final Object primitive = delegate.deserialize(topic, bytes); + if (primitive == null) { + return null; + } final Struct struct = new Struct(schema); struct.put(field, primitive); - return primitive == null ? null : struct; + return struct; } catch (final Exception e) { throw new SerializationException( "Error deserializing DELIMITED message from topic: " + topic, e); diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java index 9ab1beead718..bb12a1638824 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java @@ -147,9 +147,31 @@ public void shouldThrowOnValidateIfStruct() { } @Test - public void shouldHandleNulls() { - shouldHandle(SqlTypes.INTEGER, null); - shouldHandle(SqlTypes.STRING, null); + public void shouldSerializeNullAsNull() { + // Given: + final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER); + + final Serde serde = factory.createSerde(schema, ksqlConfig, srClientFactory); + + // When: + final byte[] result = serde.serializer().serialize("topic", null); + + // Then: + assertThat(result, is(nullValue())); + } + + @Test + public void shouldDeserializeNullAsNull() { + // Given: + final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER); + + final Serde serde = factory.createSerde(schema, ksqlConfig, srClientFactory); + + // When: + final Object result = serde.deserializer().deserialize("topic", null); + + // Then: + assertThat(result, is(nullValue())); } @Test @@ -194,11 +216,7 @@ private void shouldHandle(final SqlType fieldSchema, final Object value) { final Object result = serde.deserializer().deserialize("topic", bytes); // Then: - if (value == null) { - assertThat(result, is(nullValue())); - } else { - assertThat(result, is(struct)); - } + assertThat(result, is(struct)); } private static PersistenceSchema schemaWithFieldOfType(final SqlType fieldSchema) {