-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Make the whole grabCnx() progress atomic #20595
[fix][client] Make the whole grabCnx() progress atomic #20595
Conversation
With this patch, we don't need any synchronization when implementing |
There are some failed tests, I will fix them ASAP. |
/pulsarbot run-failure-checks |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I made new changes to avoid race condition described in #20595 (comment) The root cause is that for a cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
.exceptionally(this::handleConnectionError); All possible cases are:
Ideally, PTAL again. @poorbarcode @lifepuzzlefun @tisonkun @Technoboy- |
### Motivation In `ConnectionHandler`, there is a `Connection` interface whose methods will be called after the connection in `grabCnx` is established, the implementation of `Connection` might send some requests after the connection is established. For example, the consumer will send the `CommandSubscribe` request in `connectionOpened`. However, the whole process is not atomic, which leads to the message lost reported in apache#20591. ### Modifications Modify the `Connection` interface to have a single method: ```java CompletableFuture<Void> handleNewConnection(ClientCnx cnx, PulsarClientException e); ``` The returned future should be completed once the implementation has done everything, e.g. for the consumer, the future should only be completed after receiving the response for `CommandSubscribe`. In `grabCnx`, the `ConnectionHandler` could only connect to the broker once the whole process is completed. Add `ConnectionHandlerTest` to verify the behavior.
dc9c062
to
6116870
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. LGTM.
When there are some reviewers you may try to do accumulative commits instead of force push; thus it's easy to review the changeset compared to the last review..
You can "merge master" instead of rebase and the final "squash and merge" would squash everything into one commit for you.
I know it so I didn't push it by force before. But since there were many new commits from the approved commit before, I think it might be better to review for a completely new commit. |
The failed test |
Codecov Report
@@ Coverage Diff @@
## master #20595 +/- ##
============================================
+ Coverage 72.60% 73.13% +0.53%
- Complexity 32018 32076 +58
============================================
Files 1855 1869 +14
Lines 138569 138883 +314
Branches 15250 15273 +23
============================================
+ Hits 100605 101578 +973
+ Misses 29945 29262 -683
- Partials 8019 8043 +24
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@poorbarcode Thanks for your update. Merging... Then you can rebase your PR onto this one :D |
@BewareMyPower, the |
Oh, my fault. We can cherry-pick this PR to branch 2.10. TopicListWatcher.java is not part of the core modification. |
(cherry picked from commit 2bede01)
Motivation
In
ConnectionHandler
, there is aConnection
interface whose methods will be called after the connection ingrabCnx
is established, the implementation ofConnection
might send some requests after the connection is established. For example, the consumer will send theCommandSubscribe
request inconnectionOpened
. However, the whole process is not atomic, which leads to the message lost reported in #20591.Modifications
Modify the
Connection#connectionOpened
interface to return aCompletableFuture
.The returned future should be completed once the implementation has done everything, e.g. for the consumer, the future should only be completed after receiving the response for
CommandSubscribe
.In
grabCnx
, theConnectionHandler
could only connect to the broker once the whole process is completed.Add
ConnectionHandlerTest
to verify the behavior.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: BewareMyPower#29