Skip to content

Commit

Permalink
更新版本到0.0.3:
Browse files Browse the repository at this point in the history
1. 可以自定义有序消费或者并发消费 consumeMode
2. 添加pull consumer 支持
3. 原生 consumer和producer 注入和透出
  • Loading branch information
pufang committed Aug 21, 2017
1 parent 3caa523 commit a862afc
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 49 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spring boot starter for RocketMQ
<dependency>
<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>

<scm>
<url>https://github.com/maihaoche/rocketmq-spring-boot-starter</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
String consumerGroup();
String topic();
String messageMode() default "CLUSTERING";
String consumeMode() default "CONCURRENTLY";
String[] tag() default {"*"};
}
74 changes: 42 additions & 32 deletions src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.gson.Gson;
import com.maihaoche.starter.mq.MQException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -45,6 +46,7 @@ public void setTag(String tag) {
}

@Setter
@Getter
private DefaultMQProducer producer;

@PreDestroy
Expand Down Expand Up @@ -75,8 +77,8 @@ public String getTopic() {

private Message genMessage(String topic, String tag, Object msgObj) {
String str = gson.toJson(msgObj);
if(StringUtils.isEmpty(topic)) {
if(StringUtils.isEmpty(getTopic())) {
if (StringUtils.isEmpty(topic)) {
if (StringUtils.isEmpty(getTopic())) {
throw new RuntimeException("no topic defined to send this message");
}
topic = getTopic();
Expand All @@ -101,14 +103,14 @@ private Message genMessage(String topic, String tag, Object msgObj) {
*/
public void sendOneWay(String topic, String tag, Object msgObj) throws MQException {
try {
if(null == msgObj) {
if (null == msgObj) {
return;
}
producer.sendOneway(genMessage(topic, tag, msgObj));
log.info("send onway message success : {}", msgObj);
} catch (Exception e) {
log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj);
throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage());
log.error("消息发送失败,topic : {}, e {}", topic, e);
throw new MQException("消息发送失败,topic :" + topic + ",e:" + e);
}
}

Expand Down Expand Up @@ -143,10 +145,10 @@ public void sendOneWay(String tag, Object msgObj) throws MQException {
* @param hashKey 用于hash后选择queue的key
*/
public void sendOneWayOrderly(String topic, String tag, Object msgObj, String hashKey) {
if(null == msgObj) {
if (null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
if (StringUtils.isEmpty(hashKey)) {
// fall back to normal
sendOneWay(topic, tag, msgObj);
}
Expand All @@ -161,14 +163,15 @@ public void sendOneWayOrderly(String topic, String tag, Object msgObj, String ha

/**
* 同步发送消息
*
* @param topic topic
* @param tag tag
* @param msgObj 消息体
* @param tag tag
* @param msgObj 消息体
* @throws MQException
*/
public void synSend(String topic, String tag, Object msgObj) throws MQException {
try {
if(null == msgObj) {
if (null == msgObj) {
return;
}
SendResult sendResult = producer.send(genMessage(topic, tag, msgObj));
Expand All @@ -182,7 +185,8 @@ public void synSend(String topic, String tag, Object msgObj) throws MQException

/**
* 同步发送消息
* @param msgObj 消息体
*
* @param msgObj 消息体
* @throws MQException
*/
public void synSend(Object msgObj) throws MQException {
Expand All @@ -191,8 +195,9 @@ public void synSend(Object msgObj) throws MQException {

/**
* 同步发送消息
* @param tag 消息tag
* @param msgObj 消息体
*
* @param tag 消息tag
* @param msgObj 消息体
* @throws MQException
*/
public void synSend(String tag, Object msgObj) throws MQException {
Expand All @@ -201,17 +206,18 @@ public void synSend(String tag, Object msgObj) throws MQException {

/**
* 同步发送消息
* @param topic topic
* @param tag tag
*
* @param topic topic
* @param tag tag
* @param msgObj 消息体
* @param hashKey 用于hash后选择queue的key
* @param hashKey 用于hash后选择queue的key
* @throws MQException
*/
public void synSendOrderly(String topic, String tag, Object msgObj, String hashKey) throws MQException {
if(null == msgObj) {
if (null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
if (StringUtils.isEmpty(hashKey)) {
// fall back to normal
synSend(topic, tag, msgObj);
}
Expand All @@ -227,9 +233,10 @@ public void synSendOrderly(String topic, String tag, Object msgObj, String hashK

/**
* 异步发送消息带tag
* @param topic topic
* @param tag tag
* @param msgObj msgObj
*
* @param topic topic
* @param tag tag
* @param msgObj msgObj
* @param sendCallback 回调
* @throws MQException
*/
Expand All @@ -248,7 +255,8 @@ public void asynSend(String topic, String tag, Object msgObj, SendCallback sendC

/**
* 异步发送消息不带tag和topic
* @param msgObj msgObj
*
* @param msgObj msgObj
* @param sendCallback 回调
* @throws MQException
*/
Expand All @@ -258,8 +266,9 @@ public void asynSend(Object msgObj, SendCallback sendCallback) throws MQExceptio

/**
* 异步发送消息不带tag和topic
* @param tag msgtag
* @param msgObj msgObj
*
* @param tag msgtag
* @param msgObj msgObj
* @param sendCallback 回调
* @throws MQException
*/
Expand All @@ -269,18 +278,19 @@ public void asynSend(String tag, Object msgObj, SendCallback sendCallback) throw

/**
* 异步发送消息带tag
* @param topic topic
* @param tag tag
* @param msgObj msgObj
*
* @param topic topic
* @param tag tag
* @param msgObj msgObj
* @param sendCallback 回调
* @param hashKey 用于hash后选择queue的key
* @param hashKey 用于hash后选择queue的key
* @throws MQException
*/
public void asynSend(String topic, String tag, Object msgObj, SendCallback sendCallback, String hashKey) throws MQException {
if (null == msgObj) {
return;
}
if(StringUtils.isEmpty(hashKey)) {
if (StringUtils.isEmpty(hashKey)) {
// fall back to normal
asynSend(topic, tag, msgObj, sendCallback);
}
Expand All @@ -296,12 +306,12 @@ public void asynSend(String topic, String tag, Object msgObj, SendCallback sendC
/**
* 兼容buick中的方式
*
* @deprecated
* @param msgObj
* @throws MQException
* @deprecated please use more specific method
*/
public void sendMessage(Object msgObj) throws MQException {
if(StringUtils.isEmpty(getTopic())) {
if (StringUtils.isEmpty(getTopic())) {
throw new MQException("如果用这种方式发送消息,请在实例中重写 getTopic() 方法返回需要发送的topic");
}
sendOneWay("", "", msgObj);
Expand All @@ -310,7 +320,7 @@ public void sendMessage(Object msgObj) throws MQException {
/**
* 重写此方法处理发送后的逻辑
*
* @param sendResult 发送结果
* @param sendResult 发送结果
*/
public void doAfterSynSend(SendResult sendResult) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.maihaoche.starter.mq.base;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Set;

/**
* Created by yipin on 2017/6/27.
* RocketMQ的消费者(Push模式)处理消息的接口
*/
@Slf4j
public abstract class AbstractMQPullConsumer<T> {

public AbstractMQPullConsumer() {
}

private String topic;

private DefaultMQPullConsumer consumer;

public DefaultMQPullConsumer getConsumer() {
return consumer;
}

public void setTopic(String topic) {
this.topic = topic;
}

public void setConsumer(DefaultMQPullConsumer consumer) {
this.consumer = consumer;
}

public void startInner() {
new Thread(() -> {
try {
while(true) {
Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance(topic);
try {
for (MessageQueue mq : mqs) {
SINGLE_MQ:
while (true) {
try {//阻塞的拉去消息,中止时间默认20s
long offset = consumer.fetchConsumeOffset(mq, false);
offset = offset < 0 ? 0 : offset;
PullResult pullResult = consumer.pull(mq, null, offset, 10);
switch (pullResult.getPullStatus()) {
case FOUND://pullSataus
dealMessage(pullResult.getMsgFoundList());
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("consume message fail , e : {}", e);
}
}
}
} catch (Exception e) {
log.error("start pull consumer fail, e : {}", e.getMessage());
}
}
} catch (Exception e) {
log.error("start pull consumer fail, e : {}", e.getMessage());
}
}).start();
}


private static Gson gson = new Gson();

/**
* 继承这个方法处理消息
*
* @param message 消息范型
* @return
*/
public abstract void process(T message);

/**
* 原生dealMessage方法,可以重写此方法自定义序列化和返回消费成功的相关逻辑
*
* @param list 消息列表
* @return
*/
public void dealMessage(List<MessageExt> list) {
for(MessageExt messageExt : list) {
if(messageExt.getReconsumeTimes() != 0) {
log.info("re-consume times: {}" , messageExt.getReconsumeTimes());
}
log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags());
T t = parseMessage(messageExt);
process(t);
}
}

/**
* 反序列化解析消息
*
* @param message 消息体
* @return
*/
private T parseMessage(MessageExt message) {
if (message == null || message.getBody() == null) {
return null;
}
final Type type = this.getMessageType();
if (type instanceof Class) {
try {
Object data = gson.fromJson(new String(message.getBody()), type);
return (T) data;
} catch (JsonSyntaxException e) {
log.error("parse message json fail : {}", e.getMessage());
}
} else {
log.warn("Parse msg error. {}", message);
}
return null;
}

/**
* 解析消息类型
*
* @return
*/
private Type getMessageType() {
Type superType = this.getClass().getGenericSuperclass();
if (superType instanceof ParameterizedType) {
return ((ParameterizedType) superType).getActualTypeArguments()[0];
} else {
// 如果没有定义泛型,解析为Object
return Object.class;
// throw new RuntimeException("Unkown parameterized type.");
}
}
}
Loading

0 comments on commit a862afc

Please sign in to comment.