-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [client] Messages lost when consumer reconnect #20591
[fix] [client] Messages lost when consumer reconnect #20591
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); | ||
// Create producer and consumer. | ||
ConsumerImpl<String> consumer = (ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING) | ||
.subscriptionType(SubscriptionType.Shared) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you use Shared subcription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you use Shared subcription?
Just to make this test exactly the same as the example in the motivation( if useExclusive
, pulsar will not record the unackMessages
of the consumers). see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L1067
(Highlight) Already add all subscription types by the parameter provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question that in which case could a consumer grab connections concurrently?
### Motivation In `ConnectionHandler`, there is a `Connection` interface whose methods will be called after the connection in `grabCnx` is established, the implementation of `Connection` might send some requests after the connection is established. For example, the consumer will send the `CommandSubscribe` request in `connectionOpened`. However, the whole process is not atomic, which leads to the message lost reported in apache#20591. ### Modifications Modify the `Connection` interface to have a single method: ```java CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e); ``` The returned future should be completed once the implementation has done everything, e.g. for the consumer, the future should only be completed after receiving the response for `CommandSubscribe`. In `grabCnx`, the `ConnectionHandler` could only connect to the broker once the whole process is completed. Add `ConnectionHandlerTest` to verify the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good way to fix the message lost issue. The unit test never simulates the actual case. It only tests calling consumer.connectionOpened
.
Each reconnection should be:
ConnectionHandler
get the connection asynchronously viaPulsarClientImpl#getConnection
- The
Connection
implementation (e.g. theConsumerImpl
here) send some other requests asynchronously.
We should make the whole process atomic. Otherwise, similar issues might happen for other Connection
implementations like ProducerImpl
.
I have opened a PR to thoroughly fix the issue: #20595
Since @BewareMyPower created a new PR to use a good implement to fix this issue, Just like I said in the second step of optimization #20591 (comment), I closed this PR |
### Motivation In `ConnectionHandler`, there is a `Connection` interface whose methods will be called after the connection in `grabCnx` is established, the implementation of `Connection` might send some requests after the connection is established. For example, the consumer will send the `CommandSubscribe` request in `connectionOpened`. However, the whole process is not atomic, which leads to the message lost reported in apache#20591. ### Modifications Modify the `Connection` interface to have a single method: ```java CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e); ``` The returned future should be completed once the implementation has done everything, e.g. for the consumer, the future should only be completed after receiving the response for `CommandSubscribe`. In `grabCnx`, the `ConnectionHandler` could only connect to the broker once the whole process is completed. Add `ConnectionHandlerTest` to verify the behavior.
Since the PR #20595 does not fix the issue "Messages lost when consumer reconnect", I reopen this PR. see the comment: #20595 (comment) |
Motivation
Background of consumer reconnects
CMD-subscribe
to the brokerflow permits
to broker to incrementavailablePermits
Issue
If the consumer tries to reconnect[1] multi times, it will lose some messages due to a race condition, for example:
availablePermits
availablePermits
(Since a subscription is there, so broker just response success)We can see that the variable
availablePermits
of all the stuck consumers is large than1000
(by default, the max value is1000
), and${availablePermits} + ${msgOutCounter}
is an integer multiple of1000
. It means the broker received more than oneCMD-flow
of the same consumer[2].Why does reconnection execute more than once: there are these scenarios that could trigger reconnection:
cmd-close_consumer
, such asunload topic
,reset clusters
, and so on.Modifications
Discard the task
subscribe
if there is an in-flight subscribeDocumentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x