Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL LEFT JOIN IS NOT WORKING. #968

Closed
zamirarif opened this issue Mar 16, 2018 · 7 comments
Closed

KSQL LEFT JOIN IS NOT WORKING. #968

zamirarif opened this issue Mar 16, 2018 · 7 comments

Comments

@zamirarif
Copy link

zamirarif commented Mar 16, 2018

Hello Folks,

I have 2 kafka avro topics deal & expense but data had alot of whitespaces to clear that I have created following topic and table with trimmed data.
DEAL_STREAM
EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;

Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

ksql> describe deal_stream;

Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

When I execute the following Query its giving me null pointer exception.
I tried following queries.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
2:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

3:

CREATE STREAM deal_expense_trimmed AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0 where **td.KSQL_COL_0 IS NOT NULL;**


 Message
----------------------------
 Stream created and running
----------------------------
ksql> Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-115" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-116" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-113" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-114" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
@rodesai
Copy link
Contributor

rodesai commented Mar 16, 2018

Which version are you running? This should be fixed in the latest master.

@zamirarif
Copy link
Author

@rodesai Hi I am using the latest version of KSQL 0.5 and confluent 4.0.0

@VladMl
Copy link

VladMl commented Mar 19, 2018

I got the same error but the latest master works fine for me.

@zamirarif
Copy link
Author

@VladMl Hi Dude,
I am using the 0.5 which was released in February.

I hope you are running the same version or do you mean building the latest code from the repository.

I tried that but its failing due restriction to use the public repositories in our organization.

@VladMl
Copy link

VladMl commented Mar 19, 2018

@zamirarif
I made a build with the latest code from the repository.
Looks like the issue should have been fixed.

@apurvam
Copy link
Contributor

apurvam commented Mar 20, 2018

This was fixed by #927 .

The patch will be part of the 4.1 release at the end of march / early April.

@apurvam apurvam closed this as completed Mar 20, 2018
@zamirarif
Copy link
Author

@apurvam Thanks for the response & closing the ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants