Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LEFT JOIN query results in java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed #305

Closed
alexhafner opened this issue Sep 22, 2017 · 2 comments
Assignees

Comments

@alexhafner
Copy link

Initially we ran into this on on of our own queries. We have reproduced it with the pageviews table from the quickstart:

CREATE STREAM pageviews \
   (viewtime BIGINT, \
    userid VARCHAR, \
    pageid VARCHAR) \
   WITH (kafka_topic='pageviews', \
         value_format='DELIMITED', \
         key='pageid', \
         timestamp='viewtime');

create table pageviews_avg as select userid, sum(viewtime) / count(*) as avg_viewtime, min(viewtime) as min_viewtime, max(viewtime) as max_viewtime, count(*) as view_count from pageviews window session (60 seconds) group by userid;

SELECT p.userid, p.viewtime, pa.avg_viewtime FROM pageviews p LEFT JOIN pageviews_avg pa ON p.userid = pa.userid;

On the SELECT statement, KSQL errors with

java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
Query terminated


and the CLI log shows these errors

[2017-09-22 12:23:20,633] ERROR stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Streams application error during processing:  (org.apache.kafka.streams.processor.internals.StreamThread:527)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
	at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
	at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:20,660] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,028] ERROR Exception occurred while writing to connection stream:  (io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter:105)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
	at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
	at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:21,032] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,036] ERROR java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
 (io.confluent.ksql.cli.console.Console:130)
@hjafarpour hjafarpour self-assigned this Sep 25, 2017
@hjafarpour
Copy link
Contributor

@alexhafner The problem is that your table in the LEFT JOIN is a windowed table which means that the key is a windowed key, including the aggregate key and the window start time. On the other hand the key in your stream in userid which is a string field and join condition expects the keys to be the same type in order to compare them. Currently we only support LEFT JOIN when the table is not windowed table.

@miguno miguno closed this as completed Oct 30, 2017
@kirankvgit
Copy link

I'm also getting the same error? How can we fix the query to make it work? @hjafarpour

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants