diff --git a/pom.xml b/pom.xml index 7554534..1f450c7 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.maihaoche spring-boot-starter-rocketmq - 0.0.3 + 0.0.4 https://github.com/maihaoche/rocketmq-spring-boot-starter @@ -67,6 +67,11 @@ rocketmq-client ${rocketmq.version} + + org.springframework.boot + spring-boot-configuration-processor + true + diff --git a/src/main/java/com/maihaoche/starter/mq/annotation/MQKey.java b/src/main/java/com/maihaoche/starter/mq/annotation/MQKey.java new file mode 100644 index 0000000..e4d7b5a --- /dev/null +++ b/src/main/java/com/maihaoche/starter/mq/annotation/MQKey.java @@ -0,0 +1,19 @@ +package com.maihaoche.starter.mq.annotation; + +import java.lang.annotation.*; + +/** + * + * 用来标识作为消息key的字段 + * prefix 会作为前缀拼到字段值前面 + * + * @since 0.0.4 + * @author suclogger + * + */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface MQKey { + String prefix() default ""; +} diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java index 1987c97..95a325d 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java @@ -2,6 +2,7 @@ import com.google.gson.Gson; import com.maihaoche.starter.mq.MQException; +import com.maihaoche.starter.mq.annotation.MQKey; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -14,6 +15,8 @@ import org.apache.rocketmq.common.message.Message; import javax.annotation.PreDestroy; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; import java.nio.charset.Charset; /** @@ -76,6 +79,24 @@ public String getTopic() { } private Message genMessage(String topic, String tag, Object msgObj) { + String messageKey= ""; + try { + Field[] fields = msgObj.getClass().getDeclaredFields(); + for (Field field : fields) { + Annotation[] allFAnnos= field.getAnnotations(); + if(allFAnnos.length > 0) { + for (int i = 0; i < allFAnnos.length; i++) { + if(allFAnnos[i].annotationType().equals(MQKey.class)) { + field.setAccessible(true); + MQKey mqKey = MQKey.class.cast(allFAnnos[i]); + messageKey = StringUtils.isEmpty(mqKey.prefix()) ? field.get(msgObj).toString() : (mqKey.prefix() + field.get(msgObj).toString()); + } + } + } + } + } catch (Exception e) { + log.error("parse key error : {}" , e.getMessage()); + } String str = gson.toJson(msgObj); if(StringUtils.isEmpty(topic)) { if(StringUtils.isEmpty(getTopic())) { @@ -89,6 +110,9 @@ private Message genMessage(String topic, String tag, Object msgObj) { } else if (!StringUtils.isEmpty(getTag())) { message.setTags(getTag()); } + if(StringUtils.isNotEmpty(messageKey)) { + message.setKeys(messageKey); + } return message; } diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java index cd92614..17ac511 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java @@ -36,9 +36,10 @@ public AbstractMQPushConsumer() { * 继承这个方法处理消息 * * @param message 消息范型 + * @param messageKey 消息key * @return 处理结果 */ - public abstract boolean process(T message); + public abstract boolean processWithKey(String messageKey, T message); /** * 原生dealMessage方法,可以重写此方法自定义序列化和返回消费成功的相关逻辑 @@ -54,7 +55,7 @@ public ConsumeConcurrentlyStatus dealMessage(List list, ConsumeConcu } log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags()); T t = parseMessage(messageExt); - if( null != t && !process(t)) { + if( null != t && !processWithKey( messageExt.getKeys(), t)) { log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } @@ -76,7 +77,7 @@ public ConsumeOrderlyStatus dealMessage(List list, ConsumeOrderlyCon } log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags()); T t = parseMessage(messageExt); - if( null != t && !process(t)) { + if( null != t && !processWithKey(messageExt.getKeys(), t)) { log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId()); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } diff --git a/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java b/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java index 7225c02..c53a88a 100644 --- a/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java +++ b/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java @@ -28,6 +28,12 @@ public void init() throws Exception { // if(StringUtils.isEmpty(mqProperties.getProducerGroup())) { // throw new RuntimeException("请在配置文件中指定消息发送方group!"); // } + if(StringUtils.isEmpty(mqProperties.getProducerGroup())) { + throw new RuntimeException("producer group must be defined"); + } + if(StringUtils.isEmpty(mqProperties.getNameServerAddress())) { + throw new RuntimeException("name server address must be defined"); + } producer = new DefaultMQProducer(mqProperties.getProducerGroup()); producer.setNamesrvAddr(mqProperties.getNameServerAddress()); producer.start();