Skip to content

Commit

Permalink
[ISSUE apache#6215]make benchmark cover compress msg situation (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
humkum authored and miles-ton committed Mar 16, 2023
1 parent 14fd318 commit eacbd30
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand Down Expand Up @@ -72,9 +73,11 @@ public static void main(String[] args) throws MQClientException {
final int tagCount = getOptionValue(commandLine, 'l', 0);
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));

System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, aclEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
"aclEnable: %s%n compressEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);

StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
Expand All @@ -93,6 +96,19 @@ public static void main(String[] args) throws MQClientException {
}

final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, rpcHook);

if (enableCompress) {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
}

producer.start();

final Logger logger = LoggerFactory.getLogger(BatchProducer.class);
Expand Down Expand Up @@ -220,6 +236,23 @@ public static Options buildCommandlineOptions(final Options options) {
opt = new Option("n", "namesrv", true, "name server, Default: 127.0.0.1:9876");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
opt.setRequired(false);
options.addOption(opt);

return options;
}

Expand Down Expand Up @@ -303,7 +336,6 @@ private static DefaultMQProducer initInstance(String namesrv, boolean traceEnabl
producer.setInstanceName(Long.toString(System.currentTimeMillis()));

producer.setNamesrvAddr(namesrv);
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
return producer;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand Down Expand Up @@ -80,12 +81,13 @@ public static void main(String[] args) throws MQClientException {
final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));

System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
"traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
"asyncEnable: %s%n",
"asyncEnable: %s%n compressEnable: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
delayEnable, delayLevel, asyncEnable);
delayEnable, delayLevel, asyncEnable, enableCompress);

StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
Expand Down Expand Up @@ -153,7 +155,17 @@ public void run() {
producer.setNamesrvAddr(ns);
}

producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
if (enableCompress) {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
}

producer.start();

Expand Down Expand Up @@ -342,6 +354,22 @@ public static Options buildCommandlineOptions(final Options options) {
opt.setRequired(false);
options.addOption(opt);

opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
opt.setRequired(false);
options.addOption(opt);

return options;
}

Expand Down

0 comments on commit eacbd30

Please sign in to comment.