-
Notifications
You must be signed in to change notification settings - Fork 11.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RIP-63] Queue Selection Strategy Optimization #6568
Conversation
public void reset() { | ||
int index = Math.abs(random.nextInt()); | ||
if (index < 0) | ||
index = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code redundant
@@ -79,6 +89,10 @@ public class ClientConfig { | |||
*/ | |||
protected boolean enableStreamRequestType = false; | |||
|
|||
private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be prudent to incorporate a warning concerning order message within the document,may be order message is disturbed
@@ -2846,6 +2849,37 @@ public void setMessageRequestMode(final String brokerAddr, final String topic, f | |||
throw new MQClientException(response.getCode(), response.getRemark()); | |||
} | |||
} | |||
public Properties queryRemoteClientConfig(long timeoutMillis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be increased namesrv server load
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will remove this design.
* Remoting Client Common configuration | ||
*/ | ||
public class RemoteClientConfig { | ||
private final static Logger log = LoggerFactory.getLogger(MQClientInstance.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems more appropriate to write it like this:
private final static Logger log = LoggerFactory.getLogger(RemoteClientConfig.class);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, It should be. But I think I will remove this RemoteClientConfig...
The switches in clientConfig are enough.
Some experiment results: https://docs.google.com/document/d/1uTB_5bp6YmoghGUpFabwYsMertP7IBlNCDx9r787sOU/edit?usp=sharing |
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
Show resolved
Hide resolved
proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
Outdated
Show resolved
Hide resolved
# Conflicts: # client/src/main/java/org/apache/rocketmq/client/ClientConfig.java # proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java # proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java # proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java # proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java # proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
# Conflicts: # proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
# Conflicts: # client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
# Conflicts: # client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java # proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java # proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
return sendResultList; | ||
}, this.executor); | ||
}, this.executor) | ||
.exceptionally(t -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use whenComplete is better. When an exception occurs, the upper layer function will get null in thenApply
final int i = this.randomItem.incrementAndGet() % half; | ||
return tmpList.get(i).getName(); | ||
Collections.shuffle(tmpList); | ||
//Collections.sort(tmpList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annotation could be removed
}, this.executor); | ||
}, this.executor) | ||
.whenComplete((result, exception) -> { | ||
endTimestamp.set(System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to use AtomicLong for endTimestamp
, just a local variable would be fine
proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Make sure set the target branch to
develop
What is the purpose of the change
[RIP-63] Issue #6567 .
Optimize the proxy's and client's selection strategy for brokers when sending messages, and use multiple selection strategies as a pipeline to filter suitable queues.
Brief changelog
Client:
MQFaultStrategy
andLatencyFaultTolerance
. These changes implemented queues's filters, which will provide more flexible selecting strategies.Proxy:
selectOneByPipeline()
inMessageQueueSelector
. It will call the filters in mqFaultStrategy to select suitable queue while sending messages.select()
in SendMessageQueueSelector, now it will call theselectOneByPipeline()
, but not theselectOne()
before.(PS: If the switch is not opened, it will also run asselectOne()
).Verifying this change
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR
.[ISSUE #123] Fix UnknownException when host config not exist
. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
to make sure basic checks pass. Runmvn clean install -DskipITs
to make sure unit-test pass. Runmvn clean test-compile failsafe:integration-test
to make sure integration-test pass.