-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix one potential to add duplicated consumer #20583
[fix][broker] Fix one potential to add duplicated consumer #20583
Conversation
You say in the linked issue and you have seen this twice. |
I guess this is one of the possibilities for duplicated consumer. And this pr is corresponding to our production case. I can't affirm the duplicated consumer problem do not occur anymore. |
@poorbarcode @eolivelli @lhotari @congbobo184 Can you help review this pr? It is another possibilities besides #15051 |
@@ -1184,7 +1184,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { | |||
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, | |||
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", | |||
remoteAddress, subscriptionName)); | |||
consumers.remove(consumerId, existingConsumerFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are three clients in your example, the pulsar client maintains its own connection pool, which means the client1
will not use the same connection as client2
or client3
.
The code you changed is the code of ServerCnx
(this object is not shared across multi clients), so it will not affect other consumers of other clients
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional background:
If you have two pulsar clients, the consumers registered will be like this:
- pulsar-client-1
- connection-1 (one-to-one relationship with
ServerCnx
)- consumer-1
- consumer-2
- connection-2
- consumer-3
- consumer-4
- connection-1 (one-to-one relationship with
- pulsar-client-2
- connection-3
- consumer-5
- consumer-6
- connection-4
- consumer-7
- consumer-8
- connection-3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So only consumer-1
and consumer-2
can be in conflict since their consumer-id
is not the same, so all things are OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?
could you provide the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the log, we can not confirm if these three consumers are the same, because we can not know their consumer id
. This log was been improved in the PR https://github.com/apache/pulsar/pull/20568/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1168
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time | req-1 |
req-2 |
close connection |
---|---|---|---|
1 | put a new consumerFuture |
||
2 | close connection | ||
3 | mark the consumerFuture as failed |
||
4 | close the connection | ||
5 | got consumerFuture (failed) |
||
6 | remove the failed future | ||
7 | add the consumer into the list and set(list.size = 1 and set.size = 1 ) |
||
8 | put the second consumerFuture |
||
9 | add the second consumer into the list and set(list.size = 2 and set.size = 1 ) |
||
10 | remove the consumer from the list and set(list.size = 1 and set.size = 0 ) |
Do you want to say the three requests executed as above?
- before the
step-4
and afterstep-4
, there are two different connections - if the
req-1
runs at the first connection, it will trigger anotherconsumer.close
. see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L896- (Highlight)But maybe left an orphan consumer in the
consumer list
, see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L203. I think you can write a test to reproduce this case.
- (Highlight)But maybe left an orphan consumer in the
- if the
req-1
runs at the second connection, all things are OK. Just like the comment above [fix][broker] Fix one potential to add duplicated consumer #20583 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, there is a PR trying to fix the concurrent call subscribe
in the same client
I still think the current PR is trying to solve this issue is meaningful, and it would be nice to have a test that can reproduce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7bbfba5
to
e4f561b
Compare
e4f561b
to
fbebced
Compare
Add a test to verify this concurrent case and the fix. PTAL, thanks. @codelipenghui @eolivelli @lhotari @poorbarcode |
The pr had no activity for 30 days, mark with Stale label. |
I had the same problem on flink task and got fixed after trying this PR. |
@poorbarcode @codelipenghui It seems this problem is not fixed in the master branch. |
#22283 has fixed the same problem. Close this pr. |
Fixes #20576
Motivation
There is a case resulted in duplicated consumer, if we add the same consumer twice continuously, then close consumer, the offline consumer would remain in consumerList, because AbstractDispatcherMultipleConsumers#consumerList is arrayList and AbstractDispatcherMultipleConsumers#consumerSet is set.
There is a concurrent problem in ServerCnx#handleSubscribe, which cause this case. Suppose client request handleSubscribe() three times and request handleCloseConsumer(). The execute order is as follow. since the order is addConsumer->addConsumer->close, but not addConsumer->close->addConsumer, duplicated consumer problem occur.
Modifications
Verifying this change
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: TakaHiR07#10