LMQ采用的读放大的策略,写一份数据,多个LMQ队列分发, 因为存储的成本和效率对用户的体感最明显。写多份不仅加大了存储成本,同时也对性能和数据准确一致性提出了挑战。
上图描述的是LMQ的队列存储模型,消息可以来自各个接入场景 (如服务端的MQ/AMQP,客户端的MQTT),但只会写一份存到commitlog里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级Topic队列进行传统的服务端消费,客户端MQTT场景可以按照MQTT多级Topic(也即 LMQ)进行消费消息。
broker.conf文件需要增加以下的配置项,开启LMQ开关,这样就可以识别LMQ相关属性的消息,进行原子分发消息到LMQ队列
enableLmq = true
enableMultiDispatch = true
发送消息的时候通过设置 INNER_MULTI_DISPATCH 属性,LMQ queue使用逗号分割,queue前缀必须是 %LMQ%,这样broker就可以识别LMQ queue. 以下代码只是demo伪代码 具体逻辑参照执行即可
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
producer.start();
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* INNER_MULTI_DISPATCH property and PREFIX must start as "%LMQ%",
* If it is multiple LMQ, need to use “,” split
*/
message.putUserProperty("INNER_MULTI_DISPATCH", "%LMQ%123,%LMQ%456");
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
LMQ queue在每个broker上只有一个queue,也即queueId为0, 指明轻量级的MessageQueue,就可以拉取消息进行消费。 以下代码只是demo伪代码 具体逻辑参照执行即可
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
defaultMQPullConsumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
defaultMQPullConsumer.setVipChannelEnabled(false);
defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST");
defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST");
defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList("TopicTest")));
defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(2000);
defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(3000);
defaultMQPullConsumer.start();
String brokerName = "set broker Name";
MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0);
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("TopicTest");
Thread.sleep(30000);
Long offset = defaultMQPullConsumer.maxOffset(mq);
defaultMQPullConsumer.pullBlockIfNotFound(
mq, "*", offset, 32,
new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
List<MessageExt> list = pullResult.getMsgFoundList();
if (list == null || list.isEmpty()) {
return;
}
for (MessageExt messageExt : list) {
System.out.println(messageExt);
}
}
@Override
public void onException(Throwable e) {
}
});