Skip to content

Commit

Permalink
fix: Make null key serialization/deserialization symmetrical (#4351)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Jan 20, 2020
1 parent 4a74d24 commit 2a61acb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@
{"topic": "OUTPUT", "key": "1", "value": null},
{"topic": "OUTPUT", "key": "1", "value": "2"}
]
},
{
"name": "should not blow up on null key",
"statements": [
"CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');",
"CREATE TABLE OUTPUT as SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "key": "1", "value": "1"},
{"topic": "test_topic", "key": null, "value": "2"},
{"topic": "test_topic", "key": "1", "value": "3"}
],
"outputs": [
{"topic": "OUTPUT", "key": "1", "value": "1"},
{"topic": "OUTPUT", "key": "1", "value": "3"}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static final class RowSerializer implements Serializer<Object> {

@Override
public byte[] serialize(final String topic, final Object struct) {
final Object value = ((Struct) struct).get(field);
final Object value = struct == null ? null : ((Struct) struct).get(field);
return delegate.serialize(topic, value);
}
}
Expand All @@ -140,6 +140,9 @@ private static final class RowDeserializer implements Deserializer<Object> {
public Struct deserialize(final String topic, final byte[] bytes) {
try {
final Object primitive = delegate.deserialize(topic, bytes);
if (primitive == null) {
return null;
}
final Struct struct = new Struct(schema);
struct.put(field, primitive);
return struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.name.ColumnName;
Expand Down Expand Up @@ -146,8 +147,31 @@ public void shouldThrowOnValidateIfStruct() {
}

@Test
public void shouldHandleNulls() {
shouldHandle(SqlTypes.INTEGER, null);
public void shouldSerializeNullAsNull() {
// Given:
final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER);

final Serde<Object> serde = factory.createSerde(schema, ksqlConfig, srClientFactory);

// When:
final byte[] result = serde.serializer().serialize("topic", null);

// Then:
assertThat(result, is(nullValue()));
}

@Test
public void shouldDeserializeNullAsNull() {
// Given:
final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER);

final Serde<Object> serde = factory.createSerde(schema, ksqlConfig, srClientFactory);

// When:
final Object result = serde.deserializer().deserialize("topic", null);

// Then:
assertThat(result, is(nullValue()));
}

@Test
Expand Down

0 comments on commit 2a61acb

Please sign in to comment.