Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struct dereference on non-uppercase field names returns nulls #2543

Closed
vcrfxia opened this issue Mar 9, 2019 · 2 comments
Closed

Struct dereference on non-uppercase field names returns nulls #2543

vcrfxia opened this issue Mar 9, 2019 · 2 comments
Labels

Comments

@vcrfxia
Copy link
Contributor

vcrfxia commented Mar 9, 2019

Suppose I've created a topic and put some struct data in it:

$ ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
>{"myStruct":{"f1":123,"f2":"hello"}}

Then define a stream on the topic and verify the record is there:

ksql> CREATE STREAM source (myStruct STRUCT<"f1" INT, "f2" VARCHAR>) with (KAFKA_TOPIC='test',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> set 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change
ksql> SELECT * FROM source;
1552144676526 | null | {f1=123, f2=hello}

Now, deferencing the struct fields works as expected via SELECT:

ksql> SELECT myStruct->"f1", myStruct->"f2" FROM source;
123 | hello

but not as CREATE STREAM ... AS SELECT followed by SELECT *:

ksql> CREATE STREAM sink1 AS SELECT myStruct->"f1", myStruct->"f2" FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink1;
1552144676526 | null | null | null

Specifically, there should not be nulls in the last two columns of the last output.

Workaround: Name the deferenced fields.

ksql> CREATE STREAM sink2 AS SELECT myStruct->"f1" AS f1, myStruct->"f2" AS f2 FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink2;
1552144676526 | null | 123 | hello

Another option: Allow fieldnames in original struct to be uppercased by KSQL (the default unless quotes are used).

ksql> CREATE STREAM source2 (myStruct STRUCT<f1 INT, f2 VARCHAR>) with (KAFKA_TOPIC='test',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM source2;
1552144676526 | null | {F1=123, F2=hello}
ksql> SELECT myStruct->f1, myStruct->f2 FROM source2;
123 | hello
ksql> CREATE STREAM sink3 AS SELECT myStruct->f1, myStruct->f2 FROM source2;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink3;
1552144676526 | null | 123 | hello

This bug is present on the current master, as well as previous branches.

Extra info for debugging:

Source topic:

ksql> PRINT 'test' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552144676526,"ROWKEY":"null","myStruct":{"f1":123,"f2":"hello"}}

Stream source:

ksql> DESCRIBE EXTENDED source;

Name                 : SOURCE
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : test (partitions: 1, replication: 1)

 Field    | Type
---------------------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 MYSTRUCT | STRUCT<f1 INTEGER, f2 VARCHAR(STRING)>
---------------------------------------------------

Stream sink1 (results in nulls when printed back):

ksql> DESCRIBE EXTENDED sink1;

Name                 : SINK1
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : SINK1 (partitions: 1, replication: 1)

 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 MYSTRUCT__f1 | INTEGER
 MYSTRUCT__f2 | VARCHAR(STRING)
------------------------------------------

Queries that write into this STREAM
-----------------------------------
CSAS_SINK1_5 : CREATE STREAM sink1 AS SELECT myStruct->"f1", myStruct->"f2" FROM source;
ksql> PRINT sink1 FROM BEGINNING;
Format:JSON
{"ROWTIME":1552144676526,"ROWKEY":"null","MYSTRUCT__f1":123,"MYSTRUCT__f2":"hello"}

^ Notably, data are written to this topic as expected, but KSQL prints them back as nulls.

Stream sink2 (from workaround, prints as expected):

ksql> DESCRIBE EXTENDED sink2;

Name                 : SINK2
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : SINK2 (partitions: 1, replication: 1)

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 F1      | INTEGER
 F2      | VARCHAR(STRING)
-------------------------------------

Queries that write into this STREAM
-----------------------------------
CSAS_SINK2_6 : CREATE STREAM sink2 AS SELECT myStruct->"f1" AS f1, myStruct->"f2" AS f2 FROM source;
ksql> PRINT sink2 FROM BEGINNING;
Format:JSON
{"ROWTIME":1552144676526,"ROWKEY":"null","F1":123,"F2":"hello"}

Stream source2 (from "another option," with uppercase field names):

ksql> DESCRIBE EXTENDED source2;

Name                 : SOURCE2
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : test (partitions: 1, replication: 1)

 Field    | Type
---------------------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 MYSTRUCT | STRUCT<F1 INTEGER, F2 VARCHAR(STRING)>
---------------------------------------------------

Stream sink3 (from "another option", prints as expected):

ksql> DESCRIBE EXTENDED sink3;

Name                 : SINK3
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : SINK3 (partitions: 1, replication: 1)

 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 MYSTRUCT__F1 | INTEGER
 MYSTRUCT__F2 | VARCHAR(STRING)
------------------------------------------

Queries that write into this STREAM
-----------------------------------
CSAS_SINK3_7 : CREATE STREAM sink3 AS SELECT myStruct->f1, myStruct->f2 FROM source2;
ksql> PRINT sink3 FROM BEGINNING;
Format:JSON
{"ROWTIME":1552144676526,"ROWKEY":"null","MYSTRUCT__F1":123,"MYSTRUCT__F2":"hello"}
@vcrfxia vcrfxia added the bug label Mar 9, 2019
@vcrfxia
Copy link
Contributor Author

vcrfxia commented Mar 9, 2019

The pattern seems to be that KSQL is able to deserialize the data iff the fieldname does not contain lowercase letters.

For example, this variant of the workaround above doesn't work:

ksql> CREATE STREAM sink4 AS SELECT myStruct->"f1" AS "f1", myStruct->"f2" AS "f2" FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink4;
1552144676526 | null | null | null
ksql> PRINT sink4 FROM BEGINNING;
Format:JSON
{"ROWTIME":1552144676526,"ROWKEY":"null","f1":123,"f2":"hello"}
ksql> DESCRIBE EXTENDED sink4;

Name                 : SINK4
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : SINK4 (partitions: 1, replication: 1)

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 f1      | INTEGER
 f2      | VARCHAR(STRING)
-------------------------------------

I think the problem has to do with the fact that KSQL's JSON deserializer only uses uppercase schemas.

Specifically, this issue is an instance of the more general bug that KSQL doesn't deserialize JSON properly in the case of non-uppercase field names. For example, given this JSON topic with data:

$ ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-json
Created topic test-json.
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test-json
>{"f1":22,"f2":"foo"}

this works as expected (with auto.offset.reset=earliest):

ksql> CREATE STREAM test_json (f1 INT, f2 VARCHAR) WITH (KAFKA_TOPIC='test-json',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM test_json;
1552153469429 | null | 22 | foo

but this doesn't:

ksql> CREATE STREAM test_json2 ("f1" INT, "f2" VARCHAR) WITH (KAFKA_TOPIC='test-json',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM test_json2;
1552153469429 | null | null | null

(Those last two values shouldn't be null.)

@vcrfxia
Copy link
Contributor Author

vcrfxia commented Mar 12, 2019

Confirmed that this bug only occurs with JSON data (and not Avro). Closing this issue in favor of the more general #2551.

@vcrfxia vcrfxia closed this as completed Mar 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant