Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-63] Queue Selection Strategy Optimization #6568

Merged
merged 48 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a4f01fc
add latency files.
GenerousMan Mar 28, 2023
96f90ca
modify the logic of selectOneMessageQueue
GenerousMan Mar 28, 2023
63cbbac
faultStrategy modify.
GenerousMan Mar 28, 2023
4bfe568
modify updateFaultItems.
GenerousMan Mar 29, 2023
da61ed2
add remoting switch
GenerousMan Mar 29, 2023
8ddf4a2
start the detector
GenerousMan Apr 3, 2023
19bb1d9
modify the config
GenerousMan Apr 3, 2023
e22b40b
modify the config
GenerousMan Apr 4, 2023
f0a4b73
prepare the proxy modification
GenerousMan Apr 4, 2023
ab08b2d
proxy's latencyTolerance done, then we need filters.
GenerousMan Apr 6, 2023
fde2dea
fix style.
GenerousMan Apr 6, 2023
9ee6dab
test the proxy
GenerousMan Apr 7, 2023
d4216c1
finish proxy's faultStrategy.
GenerousMan Apr 7, 2023
542369e
remove useless files.
GenerousMan Apr 7, 2023
19bd944
checkstyle.
GenerousMan Apr 7, 2023
ae362da
add exceptionally logic.
GenerousMan Apr 10, 2023
472ecc9
fix unit test's bug.
GenerousMan Apr 10, 2023
4c01506
add a unit test.
GenerousMan Apr 11, 2023
83f55e4
remove useless methods.
GenerousMan Apr 11, 2023
0d83652
Merge branch 'develop' into queueFilter
GenerousMan Apr 11, 2023
98516ac
Merge branch 'develop' of https://github.com/apache/rocketmq into que…
GenerousMan Apr 11, 2023
ded23c3
refactor
GenerousMan Apr 12, 2023
cf9fe17
remove useless import and methods.
GenerousMan Apr 12, 2023
3841201
optimized the reset method.
GenerousMan Apr 12, 2023
ffe22e5
still using getMaxOffset() to detect brokers' status.
GenerousMan Apr 12, 2023
ee385d8
make BrokerFilter static, to avoid the find-bug's notification.
GenerousMan Apr 17, 2023
b3ef18f
add some comment to describe the switch 'sendLatencyEnable'.
GenerousMan Apr 18, 2023
f1f52f9
Put TopicRouteService into queueSelector, instead of passing into sel…
GenerousMan Apr 23, 2023
27dd55f
fix checkstyle.
GenerousMan Apr 23, 2023
2610988
fix the test case 'testSendNormalMessageQueueSelector'.
GenerousMan Apr 23, 2023
8535cd9
optimize the queueSelector's init function.
GenerousMan Apr 25, 2023
0732b2c
Merge branch 'develop' into queueFilter_new
GenerousMan Jun 20, 2023
1957f69
solve conflict.
GenerousMan Jun 20, 2023
27ce8f6
Merge branch 'develop' into queueFilter_new
GenerousMan Jun 25, 2023
02c560d
polish.
GenerousMan Jul 3, 2023
481ba50
Merge branch 'develop' into queueFilter_new
GenerousMan Jul 3, 2023
774f486
fix conflict.
GenerousMan Jul 3, 2023
e9334eb
polish
GenerousMan Jul 3, 2023
6b39ab5
polish
GenerousMan Jul 11, 2023
3a43bde
polish
GenerousMan Jul 11, 2023
70efe49
Merge branch 'develop' into queueFilter_new
GenerousMan Jul 20, 2023
196a642
fix wrong modification.
GenerousMan Aug 25, 2023
74d6405
Merge branch 'develop' into queueFilter_new
GenerousMan Aug 25, 2023
90320fa
make the switch false default.
GenerousMan Aug 25, 2023
ce56323
use whenComplete instead of exceptionally.
GenerousMan Aug 25, 2023
1ec969b
modify the requested changes.
GenerousMan Aug 29, 2023
0924aae
checkstyle.
GenerousMan Aug 29, 2023
57279e8
fix test bug.
GenerousMan Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class ClientConfig {
public static final String SOCKS_PROXY_CONFIG = "com.rocketmq.socks.proxy.config";
public static final String DECODE_READ_BODY = "com.rocketmq.read.body";
public static final String DECODE_DECOMPRESS_BODY = "com.rocketmq.decompress.body";
public static final String SEND_LATENCY_ENABLE = "com.rocketmq.sendLatencyEnable";
public static final String START_DETECTOR_ENABLE = "com.rocketmq.startDetectorEnable";
public static final String HEART_BEAT_V2 = "com.rocketmq.heartbeat.v2";
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = NetworkUtil.getLocalAddress();
Expand Down Expand Up @@ -72,6 +74,8 @@ public class ClientConfig {
private String socksProxyConfig = System.getProperty(SOCKS_PROXY_CONFIG, "{}");

private int mqClientApiTimeout = 3 * 1000;
private int detectTimeout = 200;
private int detectInterval = 2 * 1000;

private LanguageCode language = LanguageCode.JAVA;

Expand All @@ -81,6 +85,15 @@ public class ClientConfig {
*/
protected boolean enableStreamRequestType = false;

/**
* Enable the fault tolerance mechanism of the client sending process.
* DO NOT OPEN when ORDER messages are required.
* Turning on will interfere with the queue selection functionality,
* possibly conflicting with the order message.
*/
private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false"));
Copy link
Contributor

@fuyou001 fuyou001 Apr 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be prudent to incorporate a warning concerning order message within the document,may be order message is disturbed

private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));

public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
Expand Down Expand Up @@ -186,6 +199,10 @@ public void resetClientConfig(final ClientConfig cc) {
this.decodeDecompressBody = cc.decodeDecompressBody;
this.enableStreamRequestType = cc.enableStreamRequestType;
this.useHeartbeatV2 = cc.useHeartbeatV2;
this.startDetectorEnable = cc.startDetectorEnable;
this.sendLatencyEnable = cc.sendLatencyEnable;
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
}

public ClientConfig cloneClientConfig() {
Expand All @@ -210,6 +227,10 @@ public ClientConfig cloneClientConfig() {
cc.decodeDecompressBody = decodeDecompressBody;
cc.enableStreamRequestType = enableStreamRequestType;
cc.useHeartbeatV2 = useHeartbeatV2;
cc.startDetectorEnable = startDetectorEnable;
cc.sendLatencyEnable = sendLatencyEnable;
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
return cc;
}

Expand Down Expand Up @@ -381,6 +402,38 @@ public void setEnableStreamRequestType(boolean enableStreamRequestType) {
this.enableStreamRequestType = enableStreamRequestType;
}

public boolean isSendLatencyEnable() {
return sendLatencyEnable;
}

public void setSendLatencyEnable(boolean sendLatencyEnable) {
this.sendLatencyEnable = sendLatencyEnable;
}

public boolean isStartDetectorEnable() {
return startDetectorEnable;
}

public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
}

public int getDetectTimeout() {
return this.detectTimeout;
}

public void setDetectTimeout(int detectTimeout) {
this.detectTimeout = detectTimeout;
}

public int getDetectInterval() {
return this.detectInterval;
}

public void setDetectInterval(int detectInterval) {
this.detectInterval = detectInterval;
}

public boolean isUseHeartbeatV2() {
return useHeartbeatV2;
}
Expand All @@ -403,6 +456,7 @@ public String toString() {
+ ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name()
+ ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+ ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody
+ ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable
+ ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public int incrementAndGet() {
return index & POSITIVE_MASK;
}

public void reset() {
int index = Math.abs(random.nextInt(Integer.MAX_VALUE));
if (index < 0) {
index = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code redundant

}
this.threadLocalIndex.set(index);
}

@Override
public String toString() {
return "ThreadLocalIndex{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public void operationComplete(ResponseFuture responseFuture) {
} catch (Throwable e) {
}

producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
return;
}

Expand All @@ -684,14 +684,14 @@ public void operationComplete(ResponseFuture responseFuture) {
} catch (Throwable e) {
}

producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
Expand All @@ -711,7 +711,7 @@ public void operationComplete(ResponseFuture responseFuture) {
});
} catch (Exception ex) {
long cost = System.currentTimeMillis() - beginStartTime;
producer.updateFaultItem(brokerName, cost, true);
producer.updateFaultItem(brokerName, cost, true, false);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
Expand All @@ -735,7 +735,7 @@ private void onExceptionImpl(final String brokerName,
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false);
retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -125,6 +126,12 @@ public class MQClientInstance {
private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();
private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread");
}
});
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
Expand Down
Loading
Loading