Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Understanding the error message outputted by ksql #6303

Closed
eshil-patel opened this issue Sep 25, 2020 · 4 comments
Closed

Understanding the error message outputted by ksql #6303

eshil-patel opened this issue Sep 25, 2020 · 4 comments
Labels

Comments

@eshil-patel
Copy link

eshil-patel commented Sep 25, 2020

Provide details of the setup you're running
I am running ksqldb version 0.11.0. The Kafka broker version underneath is 2.4.1 (from AWS MSK).

Outline your question
I am trying to perform a join between two streams. Our data format in the Kafka topic is nested json, with no key. Here are the steps that I take:

  1. Create a stream from the original data source, with all the columns that I need
CREATE STREAM topic-stream (payload STRUCT<sf1 STRUCT<sf2 STRUCT<sf3 STRUCT<nestedField1 VARCHAR,nestedField2 VARCHAR, nestedField3 VARCHAR>>>>) WITH (KAFKA_TOPIC = 'topic-name',PARTITIONS = 1, REPLICAS = 1,VALUE_FORMAT= 'JSON'); 
  1. Create 2 streams from this stream
CREATE STREAM stream1  AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 =  '{value1}' ;
CREATE STREAM stream2  AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 =  '{value2}' ;
  1. Create a stream that joins on a common field from streams 1 and 2
CREATE STREAM joinStream AS SELECT stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3 GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

Im able to execute steps 1 and 2 with no issue, but i get an error on step 3 :

The projection contains no value columns.

If i remove the groupBy and Having statements, as well as add in the rowkey column , I have this request:

CREATE STREAM joinStream AS SELECT rowkey, stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3

which is successful.

Any ideas on what I could be doing wrong here or what is the issue?

@agavra
Copy link
Contributor

agavra commented Sep 27, 2020

Hello @eshil-patel when we examine this query:

CREATE STREAM joinStream 
  AS SELECT 
     stream1.payload->sf1->sf2->sf3->nestedField3 
  FROM stream1 LEFT JOIN stream2 
  WITHIN 10 seconds ON 
      stream1.payload->sf1->sf2->sf3->nestedField3 = 
      stream2.payload->sf1->sf2->sf3->nestedField3 
  GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 
  HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

The key for the output is stream1.payload->sf1->sf2->sf3->nestedField3 (which is determined by the GROUP BY clause). That means that when you select only that field and no other, we don't consider that query to have any value columns - it only has "key" columns. If you want a quick workaround, you can add the following to your select:

AS SELECT 
   stream1.payload->sf1->sf2->sf3->nestedField3,  
   AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue

This is different behavior from standard SQL, which doesn't differentiate between key and value columns. If you want the full details, I recommend you skim this blog: https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/ (specifically the section on aggregations - https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/#maintaining-compatibility-aggregations).

@eshil-patel
Copy link
Author

eshil-patel commented Sep 27, 2020

Hi @agavra , I've tried these two queries following your recommendation

CREATE STREAM joinStream 
  AS SELECT 
     stream1.payload->sf1->sf2->sf3->nestedField3 ,
     AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue
  FROM stream1 LEFT JOIN stream2 
  WITHIN 10 seconds ON 
      stream1.payload->sf1->sf2->sf3->nestedField3 = 
      stream2.payload->sf1->sf2->sf3->nestedField3 
  GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 
  HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

This got me this error:

Non-aggregate SELECT expression(s) not part of GROUP BY: AS_VALUE(stream1_PAYLOAD->sf1->sf2->sf3->nestedField3). Either add the column to the GROUP BY or remove it from the SELECT.

When I added it to the groupBy, this is the message I got:

The projection contains no value columns

Im assuming when I add it into the group by, it does what happened originally, and has only one key column. Any ideas whats happening in the first situation ?

@agavra
Copy link
Contributor

agavra commented Sep 27, 2020

Ah it looks like you're running into a bug that will be fixed in the next release #5906 and #5967 - we had a bug with AS_VALUE when used with a grouping clause that was a struct dereference.

You can add a aggregate column (e.g. COUNT(*)) instead into the select as a quick workaround.

@eshil-patel
Copy link
Author

@agavra Thanks for that explanation and insight, that answers my question about the query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants