-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] fix how ns-isolation-policy API works for replicated namespaces #23094
Conversation
.filter(namespaceName -> adminClient.namespaces() | ||
.getPoliciesAsync(namespaceName) | ||
.thenApply(policies -> policies.replication_clusters.contains(cluster)) | ||
.join()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be a blocking operation. it would be better to make it asynchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the filter to work, I need to call join to wait for the future completion. The reason for doing it this way is that policy is required to decide whether to remove the namespace or not. Can you suggest how I can make it async? Nothing better comes to my mind that doesn't need too much code refactoring...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you suggest how I can make it async?
There's a real problem already with the existing code, even without unloading calls. I've explained some of that in the previous comment in #23094 (comment) .
One of the problems is that all tenants and namespaces will be listed concurrently at once, without any concurrency limits. That alone will cause problems.
To fix the problem, the solution for making asynchronous calls will need concurrency limits. I'd suggest introducing a dependency to
<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>0.3.6</version>
</dependency>
and using the https://github.com/spotify/completable-futures/blob/master/src/main/java/com/spotify/futures/ConcurrencyReducer.java class for controlling the concurrency.
This challenge is that this is a systemic problem at a higher level and solving this problem in this PR might feel overly complex. However, it's possible to handle it incrementally and refactor later.
For making the code asynchronous without blocking calls, composition is needed by using thenCompose
/thenApply
.
In this case, it's not trivial, so it requires a bit more thought than usual since the unlimited concurrency problem needs to also be solved.
Hey @lhotari, while going through the tests, I realized that it will need more changes to add this flag in the admin client. |
Good catch! |
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
Outdated
Show resolved
Hide resolved
Yes, agreed on this part, concurrency should be limited. And along with that we can have this flag that will prevent unloading in cases where there are too many namespaces to unload and/or unload bundles which are required (not on primary/secondary broker group). For the bug fix, I'll move it out in a separate PR so it can be merged quickly. |
I have created #23100 PR for addressing first point only (Bug part). I'll continue on this PR for limiting the async calls and adding ConcurrencyReducer.java |
The unbounded concurrent calls is only part of the problem. IMO, the set call should only be touching the "changed/affected" namespaces and unloading those, if at all. It should be very straight forward to compare the list of namespaces matching before and after the change and only unload the delta (newly added and now-removed namespaces) so that their placement can be updated as per the policy. Currently, it can unload 100s of namespaces in one go, practically making the cluster down as it struggles to cope up with so many placement calls. When this feature was added in #8976 , there was a flag to control it, but the async work done in #15527 forgot about that and broke the contract, then it was later removed in #22449 after being called "deprecated".. This feels like a weird loophole to ignore backward compatibility and bring in breaking changes easily :) I believe that the flag was useful, but not at a broker level config. It should be part of the command itself. I am thinking a three way behavior that users may want based on their usecases:
My goal is to tackle this, rather than tackling the general problem of unbounded concurrency, which really is a much wider problem not only limited to this particular "logical" flaw/regression. What are everyone else's thoughts? |
@grssam I agree.
I think that this part is now addressed as part of #23100
That's unfortunate.
Makes sense.
+1 |
Closing this PR in favor of PIP raised: #23116 |
Fixes #23092
Motivation
Modifications
Verifying this change
(Please pick either of the following options)
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: iosdev747#1