Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL join does not work if table topic key is Avro serialised #894

Closed
rmoff opened this issue Mar 8, 2018 · 5 comments
Closed

KSQL join does not work if table topic key is Avro serialised #894

rmoff opened this issue Mar 8, 2018 · 5 comments

Comments

@rmoff
Copy link
Contributor

rmoff commented Mar 8, 2018

  • Confluent Platform 4.0
  • ksql 0.5

Start ksqlserver, start ksql

Create people topic implicitly and add a message:

kafka-avro-console-producer --topic people --broker-list localhost:9092 \
 --property parse.key=true \
 --property schema.registry.url=http://localhost:8081 \
 --property key.schema='{"type": "string"}' \
 --property value.schema='{"type":"record", "name":"person", "fields": [{"name":"id","type":"long"}, {"name":"name","type":"string"}]}' 

enter a value -- note it is a tab character between the key ("1") and the rest of the message

"1"     {"id":1,"name":"jomoore"}

Create achivement topic and add a message:

kafka-avro-console-producer --topic achievement --broker-list localhost:9092 \
                    --property parse.key=true \
                    --property schema.registry.url=http://localhost:8081 \
                    --property key.schema='{"type": "string"}' \
                    --property value.schema='{"type":"record", "name":"achievement", "fields": [{"name":"person","type":"long"}, {"name":"achievement","type":"string"}]}'

enter a value -- note it is a tab character between the key ("1") and the rest of the message

"1"     {"person":1,"achievement":"get ksql working"}

KSQL - define objects and inspect:

ksql> CREATE TABLE people WITH (KEY='id', VALUE_FORMAT='AVRO', KAFKA_TOPIC='people');

 Message
---------------
 Table created
---------------
ksql> CREATE STREAM achievement WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='achievement');

 Message
----------------
 Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> select * from people;
1520539587493 | 1 | 1 | jomoore
^CQuery terminated
ksql> select * from achievement;
1520539711706 | 1 | 1 | get ksql working
^CQuery terminated    

ksql> describe people;

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 ID      | BIGINT
 NAME    | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> describe achievement;

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 PERSON      | BIGINT
 ACHIEVEMENT | VARCHAR(STRING)
-----------------------------------------    

Try to join:

ksql> SELECT * FROM achievement a LEFT JOIN people p ON a.person = p.id;
1520539711706 | 1 | 1 | get ksql working | null | null | null | null    

Note the null fields where it should be values from person TABLE

Validate that the data exists on the matching key:

ksql> select * from achievement where person=1;
1520539711706 | 1 | 1 | get ksql working

ksql> select * from people where id=1;
1520539875001 | 1 | 1 | jomoore

The problem is that KSQL does not support Avro keys. We can actually use KSQL to rekey the topic, using PARTITION BY ID:

ksql> CREATE STREAM PEOPLE_RAW WITH (KAFKA_TOPIC='people',VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM PEOPLE_RAW;
1520539875001 | 1 | 1 | jomoore
^CQuery terminated
ksql> CREATE STREAM PEOPLE_RAW_REKEY AS SELECT * FROM PEOPLE_RAW PARTITION BY ID;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> CREATE TABLE people_rekeyed WITH (KEY='id', VALUE_FORMAT='AVRO', KAFKA_TOPIC='PEOPLE_RAW_REKEY');

 Message
---------------
 Table created
---------------

Now let's test the join again…

ksql> select * from people_rekeyed;
1520539875001 | 1 | 1 | jomoore
^CQuery terminated
ksql> SELECT a.person,p.name FROM achievement a LEFT JOIN people_rekeyed p ON a.person = p.id;
1 | jomoore

Happy days :)

So if in the topic that underpins the table used in a join, the key is Avro serialised, the join will not work. The key format must be STRING.

Related: #824

@rodesai
Copy link
Contributor

rodesai commented Mar 8, 2018

Thanks for looking at this @rmoff. The only part that doesn't make sense to me is that KSQL seems to be able to parse the key correctly:

ksql> select * from people;
1520539587493 | 1 | 1 | jomoore

If the key is serialized avro I would expect it to look like junk on the console. Maybe the junk surrounding "1" just isn't being rendered.

@rodesai
Copy link
Contributor

rodesai commented Mar 8, 2018

I'm going to close this. Issue #847 captures the need for better documentation of the restrictions on the KEY.

@rodesai rodesai closed this as completed Mar 8, 2018
@zamirarif
Copy link

Hi Team,
I am using the AVRO & my key schema for the topics is String but I am getting null pointer exception while running the joins. I am able to query on individuals streams and tables but joins are still not working after trying the solution you provided. I have raised another ticket for the issue. Please have a look
#968 (comment)

@entechlog
Copy link

@rmoff @rodesai We are facing issue this when joining a stream and table, do we have a fix or workaround?

@OneCricketeer
Copy link

@entechlog Please read #824

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants