Skip to content

Commit

Permalink
release 4.0:
Browse files Browse the repository at this point in the history
1. 添加消息key支持
2. 消息消费强制接收key参数
3. 引用者可以在配置文件中看到提示 by ybwei
  • Loading branch information
pufang committed Aug 23, 2017
1 parent 39260b7 commit 411c77f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 4 deletions.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.3</version>
<version>0.0.4</version>

<scm>
<url>https://github.com/maihaoche/rocketmq-spring-boot-starter</url>
Expand Down Expand Up @@ -67,6 +67,11 @@
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/maihaoche/starter/mq/annotation/MQKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.maihaoche.starter.mq.annotation;

import java.lang.annotation.*;

/**
*
* 用来标识作为消息key的字段
* prefix 会作为前缀拼到字段值前面
*
* @since 0.0.4
* @author suclogger
*
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MQKey {
String prefix() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.gson.Gson;
import com.maihaoche.starter.mq.MQException;
import com.maihaoche.starter.mq.annotation.MQKey;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -14,6 +15,8 @@
import org.apache.rocketmq.common.message.Message;

import javax.annotation.PreDestroy;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.nio.charset.Charset;

/**
Expand Down Expand Up @@ -76,6 +79,24 @@ public String getTopic() {
}

private Message genMessage(String topic, String tag, Object msgObj) {
String messageKey= "";
try {
Field[] fields = msgObj.getClass().getDeclaredFields();
for (Field field : fields) {
Annotation[] allFAnnos= field.getAnnotations();
if(allFAnnos.length > 0) {
for (int i = 0; i < allFAnnos.length; i++) {
if(allFAnnos[i].annotationType().equals(MQKey.class)) {
field.setAccessible(true);
MQKey mqKey = MQKey.class.cast(allFAnnos[i]);
messageKey = StringUtils.isEmpty(mqKey.prefix()) ? field.get(msgObj).toString() : (mqKey.prefix() + field.get(msgObj).toString());
}
}
}
}
} catch (Exception e) {
log.error("parse key error : {}" , e.getMessage());
}
String str = gson.toJson(msgObj);
if(StringUtils.isEmpty(topic)) {
if(StringUtils.isEmpty(getTopic())) {
Expand All @@ -89,6 +110,9 @@ private Message genMessage(String topic, String tag, Object msgObj) {
} else if (!StringUtils.isEmpty(getTag())) {
message.setTags(getTag());
}
if(StringUtils.isNotEmpty(messageKey)) {
message.setKeys(messageKey);
}
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public AbstractMQPushConsumer() {
* 继承这个方法处理消息
*
* @param message 消息范型
* @param messageKey 消息key
* @return 处理结果
*/
public abstract boolean process(T message);
public abstract boolean processWithKey(String messageKey, T message);

/**
* 原生dealMessage方法,可以重写此方法自定义序列化和返回消费成功的相关逻辑
Expand All @@ -54,7 +55,7 @@ public ConsumeConcurrentlyStatus dealMessage(List<MessageExt> list, ConsumeConcu
}
log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags());
T t = parseMessage(messageExt);
if( null != t && !process(t)) {
if( null != t && !processWithKey( messageExt.getKeys(), t)) {
log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
Expand All @@ -76,7 +77,7 @@ public ConsumeOrderlyStatus dealMessage(List<MessageExt> list, ConsumeOrderlyCon
}
log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags());
T t = parseMessage(messageExt);
if( null != t && !process(t)) {
if( null != t && !processWithKey(messageExt.getKeys(), t)) {
log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public void init() throws Exception {
// if(StringUtils.isEmpty(mqProperties.getProducerGroup())) {
// throw new RuntimeException("请在配置文件中指定消息发送方group!");
// }
if(StringUtils.isEmpty(mqProperties.getProducerGroup())) {
throw new RuntimeException("producer group must be defined");
}
if(StringUtils.isEmpty(mqProperties.getNameServerAddress())) {
throw new RuntimeException("name server address must be defined");
}
producer = new DefaultMQProducer(mqProperties.getProducerGroup());
producer.setNamesrvAddr(mqProperties.getNameServerAddress());
producer.start();
Expand Down

0 comments on commit 411c77f

Please sign in to comment.