-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize memory usage: support to shrink for pendingAcks map #14515
Conversation
category = CATEGORY_SERVER, | ||
doc = "Whether ConcurrentLongLongPairHashMap supports automatic shrinking," + | ||
"the default is false, which means automatic shrinking is not supported.") | ||
private boolean autoShrink = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't add this knob.
and if we really want the name should be more narrow and point exactly to the thing that we are changing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this configuration? Or modify the configuration name? like: autoShrinkForConcurrentLongLongPairHashMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is only for pendingAcks at the moment, so the name should reflect it.
it is not for every ConcurrentLongLongPairHashMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like this?
autoShrinkForPendingAcks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. PTAL,thanks! @eolivelli
rename autoShrink to autoShrinkForConsumerPendingAcksMap;
* @param autoShrink whether ConcurrentLongLongPairHashMap supports automatic shrinking. | ||
* @return | ||
*/ | ||
CompletableFuture<Consumer> subscribe(SubscriptionOption option, boolean autoShrink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that adding a new method for "autoShrink" is not necessary.
also the variable name should be more meaningful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean adding the autoShrink parameter directly on the original method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is global parameter, maybe you have another way to pass this way.
every Consumer will use the same value.
if you add a "parameter" in the function it means that it would be possible to see different values and this is not true
I believe that overall we don't need a configuration flag at all.
your change is good and you demonstrated that it is needed
cc @merlimat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is global parameter, maybe you have another way to pass this way.
every Consumer will use the same value.
if you add a "parameter" in the function it means that it would be possible to see different values and this is not true
OK , I will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that overall we don't need a configuration flag at all.
Do you think the autoShrink configuration item should not be added? When defining pendingAcks, set it directly to true? For example:
this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.expectedItems(256)
.concurrencyLevel(1)
.build();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. PTAL,thanks! @eolivelli
In the ServerCnx class, define a static variable autoShrinkForConsumerPendingAcksMap instead of passing parameters;
/pulsarbot run-failure-checks |
@@ -704,6 +704,12 @@ | |||
) | |||
private boolean isAllowAutoUpdateSchemaEnabled = true; | |||
|
|||
@FieldContext( | |||
category = CATEGORY_SERVER, | |||
doc = "Whether ConcurrentLongLongPairHashMap supports automatic shrinking," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supports -> enable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And let's add some description about when it's better to enable this auto shinking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
* | ||
* <p>Keys <strong>MUST</strong> be >= 0. | ||
*/ | ||
public class ConcurrentLongLongPairHashMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to copy unit test too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@Jason918 PTAL,thanks!
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are closer.
but we cannot use a 'static' variable
@@ -205,6 +205,7 @@ | |||
private boolean autoReadDisabledPublishBufferLimiting = false; | |||
private final long maxPendingBytesPerThread; | |||
private final long resumeThresholdPendingBytesPerThread; | |||
public static boolean autoShrinkForConsumerPendingAcksMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't be static
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to defining static variables, or passing parameters through methods, I have not thought of other ways. @eolivelli Do you have any other good ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static boolean autoShrinkForConsumerPendingAcksMap; | |
@Getter | |
private boolean autoShrinkForConsumerPendingAcksMap; |
I think this works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this works.
The question here is: how do we pass autoShrinkForConsumerPendingAcksMap to the Consumer class, if we don't define autoShrinkForConsumerPendingAcksMap as static and pass the method parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, I found a solution, the configuration can be accessed in this way in the consumer class:
subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@eolivelli PTAL,thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question here is: how do we pass autoShrinkForConsumerPendingAcksMap to the Consumer class, if we don't define autoShrinkForConsumerPendingAcksMap as static and pass the method parameters.
What I mean is like the handling of preciseDispatcherFlowControl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
great work !
@Jason918 PTAL,thanks! |
@merlimat PTAL,thanks! |
ping |
(cherry picked from commit e747b8f)
(cherry picked from commit e747b8f)
(cherry picked from commit e747b8f)
Modifications
issue: #14268
In order to optimize memory consumption, in the consumer class, the map used by the pendingAcks variable needs to support automatic shrink.
The original pendingAcks variable is defined by the ConcurrentLongLongPairHashMap class of bookkeeper. In order to be independent of the bookkeeper release version, this class is extracted into pulsar as a common class, similar to the ConcurrentLongHashMap class.
See the PR corresponding to bookkeeper: apache/bookkeeper#3061.
Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)