Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for generic message keys (e.g. message keys in Avro or JSON format, STRUCT) #824

Closed
apurvam opened this issue Feb 28, 2018 · 16 comments
Assignees
Labels
data-accessibility enhancement P0 Denotes must-have for a given milestone record-keys
Milestone

Comments

@apurvam
Copy link
Contributor

apurvam commented Feb 28, 2018

Right now, KSQL only uses a string Serde for keys in kafka messages. This works well for doing joins and aggregates on the entire key. However, in some cases a key (in json or avro or any other format) may have nested fields, and we may want to do joins and aggregates on a per field basis.

Further, by supporting structured keys, we can do filters on the fields of the keys as well.

This is related to #638 and #804.

We should also support non-string keys, such as Integer.

@mquraishi
Copy link

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

@miguno
Copy link
Contributor

miguno commented Sep 5, 2018

For the record: Another +1 from a user (offline conversation).

@miguno miguno changed the title Add support for structured keys (e.g. message keys in Avro or JSON format) Add support for generic message keys (e.g. message keys in Avro or JSON format, STRUCT) Dec 5, 2018
@big-andy-coates
Copy link
Contributor

Hi @mquraishi,

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

I think you're talking about an Avro serialized value where the schema is serialized as part of the message, right? At the moment I don't believe KSQL supports such a serialization format. KSQL does not expect the schema to be provided in the record. It expects the schema from the Schema Registry or provided by the users when importing the data into KSQL.

Supporting embedded schemas would be a different issue to this one. If you're still interested in seeing this worked on would you mind detailing it in a new Github issue please?

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 4, 2019

Please find attached a document comparing two potentially approaches we're looking into for structured key support in KSQL. We are looking for feedback from the community on the two approaches. We're wanting to move on this pretty quick: a decision on direction is likely to be taken by the end of this week, i.e. the Friday 8th Feb. Please leave any comments below. In addition, you can use the two additional comments below to vote for one of the approaches.

Structured Keys Comparison.pdf

We look forward to hearing from you!

Additional examples, that answer questions that have been asked below, but not covered by the main document:

Keys with 2+ levels of nesting

Let's say the key looks like:

{
    "x" : {
        "y": 1
    },
    "z": 2
}

(And the value again has two fields v0 and v1).

Logical

Logical only flattens the first level, so the above could be imported with:

CREATE STREAM FOO (x STRUCT<y INT> KEY, z INT KEY, v0 INT, V1 INT) ...

And used in a query like:

SELECT x->y, z FROM FOO;

Physical

The physical approach defines actual structure:

CREATE STREAM FOO (ROWKEY STRUCT<x STRUCT<y INT>, Z INT>>, ROWVALUE<v0 INT, v1 INT>)...

And used in a query exactly the same as above, e.g.:

SELECT x->y, z FROM FOO;

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 4, 2019

Click here if you prefer the more Logical approach.

Please leave a thumbs up emoji on this comment if you prefer for the more logical approach covered in the above document.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 4, 2019

Click here if you prefer the more Physical approach.

Please leave a thumbs up emoji on this comment if you prefer the more physical approach covered in the above document.

@miguno
Copy link
Contributor

miguno commented Feb 5, 2019

@big-andy-coates : Thanks for sharing the comparison PDF.

Two questions for the logical approach:

  • DDL syntax for defining a structured key with 2+ levels of nesting? The PDF shows only structured keys with 1 level of nesting.
  • Query syntax for working with such structured keys with 2+ levels of nesting? I suppose that in this scenario one must also use -> and not just .

The reason I am asking is that the logical approach flattens the first level of a structured key, but only the first level. And the PDF only shows examples with 1-level nested keys.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 5, 2019

@miguno : The second level of nesting would continue to work as it currently does for both approaches. Hence I'd not included it as I was trying to keep the document succinct. This is the same reason I've not covered join semantics or windowing.

But as you've asked, it probably means I should have included them ;) . I've added an example above.

@mquraishi
Copy link

Hi @mquraishi,

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

I think you're talking about an Avro serialized value where the schema is serialized as part of the message, right? At the moment I don't believe KSQL supports such a serialization format. KSQL does not expect the schema to be provided in the record. It expects the schema from the Schema Registry or provided by the users when importing the data into KSQL.

Supporting embedded schemas would be a different issue to this one. If you're still interested in seeing this worked on would you mind detailing it in a new Github issue please?

@big-andy-coates I will do that soon. It would essentially be breaking up the concerns I reported here. I think that's what you are asking for. In general, I was hoping the flexibility embedded schemas offers, albeit at the cost of performance, could be useful. Inferring it an runtime can be costly.

@big-andy-coates
Copy link
Contributor

Thanks for the feedback everyone. We'll be looking to implement the more logical model.

@Deninc
Copy link

Deninc commented Apr 14, 2020

Hello folks, the work is in progress. The current goal is to ship support for primitive keys (long, int) serialized with avro and json in the 5.4 release.

Sorry for bothering but I'm still not sure if we can do this yet?

{"name": "hello!",
 "inner": {"id", "12345"}}

CREATE TABLE <X> WITH (..., KEY='inner->id')

From #1975

@agavra
Copy link
Contributor

agavra commented Dec 11, 2020

We're finally ready to close this one out!

Huge shout out to everyone involved (cc @big-andy-coates @vcrfxia especially) - it's been a long haul. There's some future feature work related to this (like #6371 and multi-column pull query support) but you can now declare your streams with any key type (so long as the underlying format supports it) and any format supported by ksql.

@agavra agavra closed this as completed Dec 11, 2020
@mquraishi
Copy link

We're finally ready to close this one out!

Huge shout out to everyone involved (cc @big-andy-coates @vcrfxia especially) - it's been a long haul. There's some future feature work related to this (like #6371 and multi-column pull query support) but you can now declare your streams with any key type (so long as the underlying format supports it) and any format supported by ksql.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-accessibility enhancement P0 Denotes must-have for a given milestone record-keys
Projects
None yet
Development

No branches or pull requests

10 participants