Skip to content

Commit

Permalink
1. 消息添加多tag支持
Browse files Browse the repository at this point in the history
2. 发送支持通过hash选择queue达到有序效果
3. 添加广播消费模式
  • Loading branch information
pufang committed Aug 2, 2017
1 parent add28ac commit 6dfa2b6
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 12 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

spring boot starter for RocketMQ

<p><a href="http://search.maven.org/#search%7Cga%7C1%7Ccom.maihaoche"><img src="https://maven-badges.herokuapp.com/maven-central/com.maihaoche/spring-boot-starter-rocketmq/badge.svg" alt="Maven Central" style="max-width:100%;"></a>

<a href="https://github.com/maihaoche/rocketmq-spring-boot-starter/releases"><img src="https://camo.githubusercontent.com/795f06dcbec8d5adcfadc1eb7a8ac9c7d5007fce/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f72656c656173652d646f776e6c6f61642d6f72616e67652e737667" alt="GitHub release" data-canonical-src="https://img.shields.io/badge/release-download-orange.svg" style="max-width:100%;"></a>
<p><a href="http://search.maven.org/#search%7Cga%7C1%7Ccom.maihaoche"><img src="https://maven-badges.herokuapp.com/maven-central/com.maihaoche/spring-boot-starter-rocketmq/badge.svg" alt="Maven Central" style="max-width:100%;"></a><a href="https://github.com/maihaoche/rocketmq-spring-boot-starter/releases"><img src="https://camo.githubusercontent.com/795f06dcbec8d5adcfadc1eb7a8ac9c7d5007fce/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f72656c656173652d646f776e6c6f61642d6f72616e67652e737667" alt="GitHub release" data-canonical-src="https://img.shields.io/badge/release-download-orange.svg" style="max-width:100%;"></a>

首先添加maven依赖:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
public @interface MQConsumer {
String consumerGroup();
String topic();
String tag() default "*";
String messageMode() default "CLUSTERING";
String[] tag() default {"*"};
}
121 changes: 120 additions & 1 deletion src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;

import javax.annotation.PreDestroy;
Expand All @@ -22,6 +24,8 @@ public abstract class AbstractMQProducer {

private static Gson gson = new Gson();

private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

public AbstractMQProducer() {
}

Expand Down Expand Up @@ -97,7 +101,11 @@ private Message genMessage(String topic, String tag, Object msgObj) {
*/
public void sendOneWay(String topic, String tag, Object msgObj) throws MQException {
try {
if(null == msgObj) {
return;
}
producer.sendOneway(genMessage(topic, tag, msgObj));
log.info("send onway message success : {}", msgObj);
} catch (Exception e) {
log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj);
throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage());
Expand All @@ -114,6 +122,43 @@ public void sendOneWay(Object msgObj) throws MQException {
sendOneWay("", "", msgObj);
}

/**
* fire and forget 不关心消息是否送达,可以提高发送tps
*
* @param tag
* @param msgObj
* @throws MQException
*/
public void sendOneWay(String tag, Object msgObj) throws MQException {
sendOneWay("", tag, msgObj);
}


/**
* 可以保证同一个queue有序
*
* @param topic
* @param tag
* @param msgObj
* @param hashKey 用于hash后选择queue的key
*/
public void sendOneWayOrderly(String topic, String tag, Object msgObj, String hashKey) {
if(null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
// fall back to normal
sendOneWay(topic, tag, msgObj);
}
try {
producer.sendOneway(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey);
log.info("send onway message orderly success : {}", msgObj);
} catch (Exception e) {
log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj);
throw new MQException("顺序消息发送失败,topic :" + topic + ",e:" + e.getMessage());
}
}

/**
* 同步发送消息
* @param topic topic
Expand Down Expand Up @@ -144,6 +189,42 @@ public void synSend(Object msgObj) throws MQException {
synSend("", "", msgObj);
}

/**
* 同步发送消息
* @param tag 消息tag
* @param msgObj 消息体
* @throws MQException
*/
public void synSend(String tag, Object msgObj) throws MQException {
synSend("", tag, msgObj);
}

/**
* 同步发送消息
* @param topic topic
* @param tag tag
* @param msgObj 消息体
* @param hashKey 用于hash后选择queue的key
* @throws MQException
*/
public void synSendOrderly(String topic, String tag, Object msgObj, String hashKey) throws MQException {
if(null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
// fall back to normal
synSend(topic, tag, msgObj);
}
try {
SendResult sendResult = producer.send(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey);
log.info("send rocketmq message orderly ,messageId : {}", sendResult.getMsgId());
this.doAfterSynSend(sendResult);
} catch (Exception e) {
log.error("顺序消息发送失败,topic : {}, msgObj {}", topic, msgObj);
throw new MQException("顺序消息发送失败,topic :" + topic + ",e:" + e.getMessage());
}
}

/**
* 异步发送消息带tag
* @param topic topic
Expand Down Expand Up @@ -175,9 +256,47 @@ public void asynSend(Object msgObj, SendCallback sendCallback) throws MQExceptio
asynSend("", "", msgObj, sendCallback);
}

/**
* 异步发送消息不带tag和topic
* @param tag msgtag
* @param msgObj msgObj
* @param sendCallback 回调
* @throws MQException
*/
public void asynSend(String tag, Object msgObj, SendCallback sendCallback) throws MQException {
asynSend("", tag, msgObj, sendCallback);
}

/**
* 异步发送消息带tag
* @param topic topic
* @param tag tag
* @param msgObj msgObj
* @param sendCallback 回调
* @param hashKey 用于hash后选择queue的key
* @throws MQException
*/
public void asynSend(String topic, String tag, Object msgObj, SendCallback sendCallback, String hashKey) throws MQException {
if (null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
// fall back to normal
asynSend(topic, tag, msgObj, sendCallback);
}
try {
producer.send(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey, sendCallback);
log.info("send rocketmq message asyn");
} catch (Exception e) {
log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj);
throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage());
}
}

/**
* 兼容buick中的方式
*
* @deprecated
* @param msgObj
* @throws MQException
*/
Expand All @@ -194,4 +313,4 @@ public void sendMessage(Object msgObj) throws MQException {
* @param sendResult 发送结果
*/
public void doAfterSynSend(SendResult sendResult) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ public AbstractMQPushConsumer() {
*/
public ConsumeConcurrentlyStatus dealMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt messageExt : list) {
if(messageExt.getReconsumeTimes() != 0) {
log.info("re-consume times: {}" , messageExt.getReconsumeTimes());
}
log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags());
T t = parseMessage(messageExt);
if( null != t && !process(t)) {
log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
Expand Down Expand Up @@ -77,7 +82,9 @@ private Type getMessageType() {
if (superType instanceof ParameterizedType) {
return ((ParameterizedType) superType).getActualTypeArguments()[0];
} else {
throw new RuntimeException("Unkown parameterized type.");
// 如果没有定义泛型,解析为Object
return Object.class;
// throw new RuntimeException("Unkown parameterized type.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import com.maihaoche.starter.mq.annotation.MQConsumer;
import com.maihaoche.starter.mq.base.AbstractMQPushConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.util.List;
Expand Down Expand Up @@ -49,7 +50,8 @@ private void publishConsumer(String beanName, Object bean) throws Exception {
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(mqProperties.getNameServerAddress());
consumer.subscribe(topic, mqConsumer.tag());
consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode()));
consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(),"||"));
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> {
if(!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
Expand All @@ -61,4 +63,4 @@ private void publishConsumer(String beanName, Object bean) throws Exception {
consumer.start();
log.info(String.format("%s is ready to subscribe message", bean.getClass().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class MQProducerAutoConfiguration extends MQBaseAutoConfiguration {
@PostConstruct
public void init() throws Exception {
if(producer == null) {
if(StringUtils.isEmpty(mqProperties.getProducerGroup())) {
throw new RuntimeException("请在配置文件中指定消息发送方group!");
}
// if(StringUtils.isEmpty(mqProperties.getProducerGroup())) {
// throw new RuntimeException("请在配置文件中指定消息发送方group!");
// }
producer = new DefaultMQProducer(mqProperties.getProducerGroup());
producer.setNamesrvAddr(mqProperties.getNameServerAddress());
producer.start();
Expand Down

0 comments on commit 6dfa2b6

Please sign in to comment.