Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonDeserializer couldn't work in the kafka sink connector #44

Open
RaghadAlkhudhair opened this issue Feb 13, 2024 · 1 comment
Open

Comments

@RaghadAlkhudhair
Copy link

I am using the json deserializer on a kafka connector using debezuim and the kafka is deployed on a docker image.

the plugin was installed successfully but when I try to use the smt using the following code added to the sink connector :

transforms: "json"
transforms.json.type: "com.birdie.kafka.connect.smt.DebeziumJsonDeserializer"
transforms.json.optional-struct-fields: true

I get the following error saying it couldn't cast kafka sink to kafka sourcse:


│       State:  FAILED                                                                                                                                                                                                                          │
│       Trace:  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler                                                                                                                                           │
│               at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)                                                                                                   │
│               at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)                                                                                                              │
│               at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)                                                                                                                                      │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:552)                                                                                                                           │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:505)                                                                                                                                     │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:341)                                                                                                                                                │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)                                                                                                                                           │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)                                                                                                                                             │
│               at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)                                                                                                                                                       │
│               at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)                                                                                                                                                         │
│               at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)                                                                                                                                │
│               at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)                                                                                                                                            │
│               at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                                           │
│               at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)                                                                                                                                    │
│               at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)                                                                                                                                    │
│               at java.base/java.lang.Thread.run(Thread.java:833)                                                                                                                                                                              │
│ Caused by: java.lang.ClassCastException: class org.apache.kafka.connect.sink.SinkRecord cannot be cast to class org.apache.kafka.connect.source.SourceRecord (org.apache.kafka.connect.sink.SinkRecord and org.apache.kafka.connect.source.So │
│ urceRecord are in unnamed module of loader 'app')                                                                                                                                                                                             │
│   at com.birdie.kafka.connect.smt.DebeziumJsonDeserializer.apply(DebeziumJsonDeserializer.java:26)                                                                                                                                            │
│   at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)                                                                                                                                                  │
│   at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)                                                                                                                                         │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)                                                                                                                     │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)                                                                                                               │
│   ... 15 more

could you support please??

@gtsopour
Copy link

Hello @RaghadAlkhudhair
did you manage to find a solution on this or did you use any other smts? Additionally is this plugin publicly available that I can refer it on my KafkaConnect setup or I have to build/host it by myself?

Thanks in advance for your support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants