Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON stream/table with non-uppercase field names returns nulls #2551

Closed
vcrfxia opened this issue Mar 12, 2019 · 2 comments
Closed

JSON stream/table with non-uppercase field names returns nulls #2551

vcrfxia opened this issue Mar 12, 2019 · 2 comments

Comments

@vcrfxia
Copy link
Contributor

vcrfxia commented Mar 12, 2019

Because KSQL's JSON deserializer currently only works with schemas that have all uppercase field names, if a user defines a JSON stream/table with a schema with field names that are non-uppercase, KSQL will be unable to read data in those fields and will treat them as null.

This leads to a variety of weird behaviors. For example, given this JSON topic with data:

$ ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-json
Created topic test-json.
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test-json
>{"f1":22,"f2":"foo"}

this works as expected (with auto.offset.reset=earliest):

ksql> CREATE STREAM test_json (f1 INT, f2 VARCHAR) WITH (KAFKA_TOPIC='test-json',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM test_json;
1552153469429 | null | 22 | foo

but this doesn't:

ksql> CREATE STREAM test_json2 ("f1" INT, "f2" VARCHAR) WITH (KAFKA_TOPIC='test-json',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM test_json2;
1552153469429 | null | null | null

In particular, those last two values shouldn't be null.

As another example, this limitation of the KSQL JSON deserializer can also lead to situations where a SELECT <query> prints data fine, but a CREATE STREAM ... AS SELECT <query> followed by SELECT * will print nulls.

Suppose I've created a JSON topic and put some data in it:

$ ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
>{"myStruct":{"f1":123,"f2":"hello"}}

I can define a stream and verify the data is there (with auto.offset.reset=earliest):

ksql> CREATE STREAM source (myStruct STRUCT<"f1" INT, "f2" VARCHAR>) with (KAFKA_TOPIC='test',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM source;
1552144676526 | null | {f1=123, f2=hello}

Then, the following SELECT query prints results as expected:

ksql> SELECT myStruct->"f1", myStruct->"f2" FROM source;
123 | hello

but not as CREATE STREAM ... AS SELECT followed by SELECT *:

ksql> CREATE STREAM sink1 AS SELECT myStruct->"f1", myStruct->"f2" FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink1;
1552144676526 | null | null | null

The reason for this is because though the CREATE STREAM sink1 AS SELECT ... correctly creates and populates the new stream sink1, KSQL is unable to read data back out of sink1 since sink1 has non-uppercase field names:

ksql> DESCRIBE EXTENDED sink1;

Name                 : SINK1
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : SINK1 (partitions: 1, replication: 1)

 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 MYSTRUCT__f1 | INTEGER
 MYSTRUCT__f2 | VARCHAR(STRING)
------------------------------------------

Note that there is a simple workaround (until these JSON deserializer issues are resolved): simply use uppercase field names, which happens by default.

For the second example the workaround looks like:

ksql> CREATE STREAM sink2 AS SELECT myStruct->f1, myStruct->f2 FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT * FROM sink2;
1552144676526 | null | 123 | hello
@apigban
Copy link

apigban commented Sep 1, 2019

I ran into this now. I used "AS" to create a lower_case representation of the key, like below:

        SELECT EXTRACTJSONFIELD(Geo, '$') as "Geo",
        EXTRACTJSONFIELD(test, '$') as "test",
        EXTRACTJSONFIELD(check, '$') as "check",
        location as "location",
        command as "command",
        src_ip,
        program_name,
        timestamp as "timestamp",
        host as "host",
        id as "id",
        full_log FROM raw;

CREATE STREAM SRCIP_REKEY WITH (PARTITIONS=1) AS SELECT * FROM intelstream0 PARTITION BY SRC_IP;

gives null results like below:

{
   "Geo": null,
   "test": null,
   "check": null,
   "location": null,
   "command": null,
   "SRC_IP": "192.168.1.11",
   "PROGRAM_NAME": null,
   "timestamp": null,
   "host": null,
   "id": null,
   "FULL_LOG": "Sep  1 13:15:55 buffer-system00 sshd[1452]: Failed password"
}

@agavra
Copy link
Contributor

agavra commented Oct 28, 2019

Thanks for reporting this @vcrfxia! I believe it should be fixed in master - I tested all the examples that you provided. Here are the more complicated of which:

ksql> PRINT test2 FROM BEGINNING;
Format:JSON
{"ROWTIME":1572282171712,"ROWKEY":"null","myStruct":{"f1":123,"f2":"hello"}}
{"ROWTIME":1572282334159,"ROWKEY":"null","MYSTRUCT":{"f1":123,"f2":"hello"}}
^CTopic printing ceased

ksql> CREATE STREAM source (myStruct STRUCT<"f1" INT, "f2" VARCHAR>) with (KAFKA_TOPIC='test2',VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SELECT * FROM SOURCE EMIT CHANGES;
+------------------------------+------------------------------+------------------------------+
|ROWTIME                       |ROWKEY                        |MYSTRUCT                      |
+------------------------------+------------------------------+------------------------------+
|1572282171712                 |null                          |{f1=123, f2=hello}            |
|1572282334159                 |null                          |{f1=123, f2=hello}            |
^CQuery terminated

ksql> SELECT myStruct->"f1", myStruct->"f2" FROM source EMIT CHANGES;
+----------------------------------------------+----------------------------------------------+
|MYSTRUCT__f1                                  |MYSTRUCT__f2                                  |
+----------------------------------------------+----------------------------------------------+
|123                                           |hello                                         |
|123                                           |hello                                         |
^CQuery terminated
ksql> CREATE STREAM sink1 AS SELECT myStruct->"f1", myStruct->"f2" FROM source;

 Message
--------------------------------------------------------------------------------
 Stream SINK1 created and running. Created by query with query ID: CSAS_SINK1_5
--------------------------------------------------------------------------------

ksql> SELECT * FROM sink1 EMIT CHANGES;
+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |MYSTRUCT__f1          |MYSTRUCT__f2          |
+----------------------+----------------------+----------------------+----------------------+
|1572282171712         |null                  |123                   |hello                 |
|1572282334159         |null                  |123                   |hello                 |
^CQuery terminated

Feel free to reopen if this still happens in production. There's still some issues in JSON if you specify multiple case-sensitive fields that map to the same case-insensitive value (e.g. foo and fOO) described here #3666

@agavra agavra closed this as completed Oct 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants