-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][io] Kafka Source connector maybe stuck #22511
Conversation
@shibd Please add the following content to your PR description and select a checkbox:
|
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good work @shibd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
Motivation
The current implementation of Kafka source connector, that
KafkaRecord
does not implement thefail()
method.pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Line 222 in bbae607
If
PulsarSink
send a message to pulsar failed, will callrecord.fail()
.pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Lines 194 to 196 in 43a9898
But because KafkaRecord does not implement it, this
futures
will never end.pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Line 178 in 495b141
This causes the Kafka consumer to leave the consumer group due to more than
max.poll.interval.ms
.Modifications
fail()
method forKakfaRecord
.CompletableFuture.allOf(futures).get()
support a timeout, set to 2/3 ofmax.poll.interval.ms
. This adjustment is to ensure the interval between twopoll
requests does not exceed Kafkamax.poll.interval.ms
.Verifying this change
throwExceptionBySendFail
andthrowExceptionBySendTimeOut
to cover it.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: