-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter out nulls before group by and select key operations to avoid NPE #927
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
This will avoid the NPE as stated but I think we should not have null value possibility here since all values are GenericRow
instances. We have to see where we create the null value.
@@ -240,7 +240,7 @@ public SchemaKStream selectKey(final Field newKeyField, boolean updateRowKey) { | |||
} | |||
|
|||
|
|||
KStream keyedKStream = kstream.selectKey((key, value) -> { | |||
KStream keyedKStream = kstream.filter((key, value) -> value != null).selectKey((key, value) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're at it should we fix the NPE that I think would get thrown on line 248 if the value in the key column is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @rodesai , I agree that it would be nice to fix it as part of this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. will do
@hjafarpour the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming the cost of the double extractColumn is negligible.
@@ -133,7 +133,7 @@ public SchemaKStream select(final Schema selectSchema) { | |||
List<Object> newColumns = new ArrayList<>(); | |||
for (Field schemaField : selectSchema.fields()) { | |||
newColumns.add( | |||
row.getColumns().get(SchemaUtil.getFieldIndexByName(schema, schemaField.name())) | |||
extractColumn(schemaField, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: all on a single line now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #521