Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter out nulls before group by and select key operations to avoid NPE #927

Merged
merged 5 commits into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ public SchemaKStream select(final Schema selectSchema) {
kstream.mapValues(row -> {
List<Object> newColumns = new ArrayList<>();
for (Field schemaField : selectSchema.fields()) {
newColumns.add(
row.getColumns().get(SchemaUtil.getFieldIndexByName(schema, schemaField.name()))
newColumns.add(extractColumn(schemaField, row)
);
}
return new GenericRow(newColumns);
Expand Down Expand Up @@ -240,14 +239,13 @@ public SchemaKStream selectKey(final Field newKeyField, boolean updateRowKey) {
}


KStream keyedKStream = kstream.selectKey((key, value) -> {
String newKey =
value
.getColumns()
.get(SchemaUtil.getFieldIndexByName(schema, newKeyField.name()))
.toString();
return newKey;
}).mapValues((key, row) -> {
KStream keyedKStream = kstream.filter((key, value) ->
value != null
&& extractColumn(newKeyField, value) != null
).selectKey((key, value) ->
extractColumn(newKeyField, value)
.toString()
).mapValues((key, row) -> {
if (updateRowKey) {
row.getColumns().set(SchemaUtil.ROWKEY_NAME_INDEX, key);
}
Expand All @@ -265,6 +263,12 @@ public SchemaKStream selectKey(final Field newKeyField, boolean updateRowKey) {
);
}

private Object extractColumn(Field newKeyField, GenericRow value) {
return value
.getColumns()
.get(SchemaUtil.getFieldIndexByName(schema, newKeyField.name()));
}

private String fieldNameFromExpression(Expression expression) {
if (expression instanceof DereferenceExpression) {
DereferenceExpression dereferenceExpression =
Expand Down Expand Up @@ -316,7 +320,7 @@ public SchemaKGroupedStream groupBy(
SchemaUtil.getIndexInSchema(groupByExpr.toString(), getSchema()));
}

KGroupedStream kgroupedStream = kstream.groupBy(
KGroupedStream kgroupedStream = kstream.filter((key, value) -> value != null).groupBy(
(key, value) -> {
StringBuilder newKey = new StringBuilder();
boolean addSeparator1 = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class AggregateNodeTest {
private StreamsBuilder builder = new StreamsBuilder();

@Test
public void shouldBuildSourceNode() throws Exception {
public void shouldBuildSourceNode() {
build();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
Expand All @@ -81,19 +81,19 @@ public void shouldHaveTwoSubTopologies() {
@Test
public void shouldHaveSourceNodeForSecondSubtopolgy() {
buildRequireRekey();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000009");
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000006")));
assertThat(node.topics(), containsString("[KSTREAM-AGGREGATE-STATE-STORE-0000000005"));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000007")));
assertThat(node.topics(), containsString("[KSTREAM-AGGREGATE-STATE-STORE-0000000006"));
assertThat(node.topics(), containsString("-repartition]"));
}

@Test
public void shouldHaveSinkNodeWithSameTopicAsSecondSource() {
buildRequireRekey();
TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), "KSTREAM-SINK-0000000007");
final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000009");
TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), "KSTREAM-SINK-0000000008");
final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
assertThat(sink.successors(), equalTo(Collections.emptySet()));
assertThat("[" + sink.topic() + "]", equalTo(source.topics()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private void buildJoinNode(String queryString) {
}

@Test
public void shouldBuildSourceNode() throws Exception {
public void shouldBuildSourceNode() {
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
Expand All @@ -89,9 +89,8 @@ public void shouldBuildSourceNode() throws Exception {
}

@Test
public void shouldBuildTableNodeWithCorrectAutoCommitOffsetPolicy() throws Exception {
public void shouldBuildTableNodeWithCorrectAutoCommitOffsetPolicy() {

StreamsBuilder streamsBuilder = mock(StreamsBuilder.class);
KsqlConfig ksqlConfig = mock(KsqlConfig.class);
KafkaTopicClient kafkaTopicClient = mock(KafkaTopicClient.class);
MetastoreUtil metastoreUtil = mock(MetastoreUtil.class);
Expand Down Expand Up @@ -150,12 +149,11 @@ public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig,
@Test
public void shouldHaveLeftJoin() {
final Topology topology = builder.build();
System.out.println(topology.describe());
final TopologyDescription.Processor leftJoin
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000013");
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000014");
final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000012")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000013")));
}

@Test
Expand Down
48 changes: 48 additions & 0 deletions ksql-engine/src/test/resources/query-validation-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,34 @@
{"topic": "REPARTITIONED", "key": "zero", "value": "0,zero,50", "timestamp": 0}
]
},
{
"name": "partition by with null value",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited', key='ID');",
"CREATE STREAM REPARTITIONED AS select name,id from TEST partition by name;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": null, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0,zero,50", "timestamp": 0}
],
"outputs": [
{"topic": "REPARTITIONED", "key": "zero", "value": "zero,0", "timestamp": 0}
]
},
{
"name": "partition by with null partition by value",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited', key='ID');",
"CREATE STREAM REPARTITIONED AS select name,id from TEST partition by name;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0,,1", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0,zero,50", "timestamp": 0}
],
"outputs": [
{"topic": "REPARTITIONED", "key": "zero", "value": "zero,0", "timestamp": 0}
]
},
{
"name": "max double group by",
"statements": [
Expand Down Expand Up @@ -538,6 +566,26 @@
{"topic": "S2", "key": 0, "value": "{\"TOPK\":[\"c\",\"b\",\"a\"],\"ID\":0}", "timestamp": 0},
{"topic": "S2", "key": 0, "value": "{\"TOPK\":[\"d\",\"c\",\"b\"],\"ID\":0}", "timestamp": 0}
]
},
{
"name": "count group by value",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE double) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE TABLE S2 as SELECT name, count(*) FROM test group by name;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0,zero,0.0", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0,zero,0.0", "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": "100,100,0.0", "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": null, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": "100,100,0.0", "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": "zero", "value": "zero,1", "timestamp": 0},
{"topic": "S2", "key": "zero", "value": "zero,2", "timestamp": 0},
{"topic": "S2", "key": 100, "value": "100,1", "timestamp": 0},
{"topic": "S2", "key": 100, "value": "100,2", "timestamp": 0}
]
}
]
}