From a862afce85364c23dd05876e6573b3bff8870f29 Mon Sep 17 00:00:00 2001 From: pufang Date: Mon, 21 Aug 2017 15:56:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=89=88=E6=9C=AC=E5=88=B00.?= =?UTF-8?q?0.3=EF=BC=9A=201.=20=E5=8F=AF=E4=BB=A5=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E6=9C=89=E5=BA=8F=E6=B6=88=E8=B4=B9=E6=88=96=E8=80=85?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E6=B6=88=E8=B4=B9=20consumeMode=202.=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0pull=20consumer=20=E6=94=AF=E6=8C=81=203.=20?= =?UTF-8?q?=E5=8E=9F=E7=94=9F=20consumer=E5=92=8Cproducer=20=E6=B3=A8?= =?UTF-8?q?=E5=85=A5=E5=92=8C=E9=80=8F=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- pom.xml | 2 +- .../starter/mq/annotation/MQConsumer.java | 1 + .../starter/mq/base/AbstractMQProducer.java | 74 +++++---- .../mq/base/AbstractMQPullConsumer.java | 151 ++++++++++++++++++ .../mq/base/AbstractMQPushConsumer.java | 42 ++++- .../config/MQConsumerAutoConfiguration.java | 59 +++++-- 7 files changed, 282 insertions(+), 49 deletions(-) create mode 100644 src/main/java/com/maihaoche/starter/mq/base/AbstractMQPullConsumer.java diff --git a/README.md b/README.md index 1504602..45f29fc 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ spring boot starter for RocketMQ com.maihaoche spring-boot-starter-rocketmq - 0.0.2 + 0.0.3 ``` diff --git a/pom.xml b/pom.xml index 0ddd286..7554534 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.maihaoche spring-boot-starter-rocketmq - 0.0.2 + 0.0.3 https://github.com/maihaoche/rocketmq-spring-boot-starter diff --git a/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java b/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java index bb47d61..e1ce9f1 100644 --- a/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java +++ b/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java @@ -16,5 +16,6 @@ String consumerGroup(); String topic(); String messageMode() default "CLUSTERING"; + String consumeMode() default "CONCURRENTLY"; String[] tag() default {"*"}; } diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java index 06ff43f..38d657d 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java @@ -2,6 +2,7 @@ import com.google.gson.Gson; import com.maihaoche.starter.mq.MQException; +import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -45,6 +46,7 @@ public void setTag(String tag) { } @Setter + @Getter private DefaultMQProducer producer; @PreDestroy @@ -75,8 +77,8 @@ public String getTopic() { private Message genMessage(String topic, String tag, Object msgObj) { String str = gson.toJson(msgObj); - if(StringUtils.isEmpty(topic)) { - if(StringUtils.isEmpty(getTopic())) { + if (StringUtils.isEmpty(topic)) { + if (StringUtils.isEmpty(getTopic())) { throw new RuntimeException("no topic defined to send this message"); } topic = getTopic(); @@ -101,14 +103,14 @@ private Message genMessage(String topic, String tag, Object msgObj) { */ public void sendOneWay(String topic, String tag, Object msgObj) throws MQException { try { - if(null == msgObj) { + if (null == msgObj) { return; } producer.sendOneway(genMessage(topic, tag, msgObj)); log.info("send onway message success : {}", msgObj); } catch (Exception e) { - log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj); - throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage()); + log.error("消息发送失败,topic : {}, e {}", topic, e); + throw new MQException("消息发送失败,topic :" + topic + ",e:" + e); } } @@ -143,10 +145,10 @@ public void sendOneWay(String tag, Object msgObj) throws MQException { * @param hashKey 用于hash后选择queue的key */ public void sendOneWayOrderly(String topic, String tag, Object msgObj, String hashKey) { - if(null == msgObj) { + if (null == msgObj) { return; } - if(StringUtils.isEmpty(hashKey)) { + if (StringUtils.isEmpty(hashKey)) { // fall back to normal sendOneWay(topic, tag, msgObj); } @@ -161,14 +163,15 @@ public void sendOneWayOrderly(String topic, String tag, Object msgObj, String ha /** * 同步发送消息 + * * @param topic topic - * @param tag tag - * @param msgObj 消息体 + * @param tag tag + * @param msgObj 消息体 * @throws MQException */ public void synSend(String topic, String tag, Object msgObj) throws MQException { try { - if(null == msgObj) { + if (null == msgObj) { return; } SendResult sendResult = producer.send(genMessage(topic, tag, msgObj)); @@ -182,7 +185,8 @@ public void synSend(String topic, String tag, Object msgObj) throws MQException /** * 同步发送消息 - * @param msgObj 消息体 + * + * @param msgObj 消息体 * @throws MQException */ public void synSend(Object msgObj) throws MQException { @@ -191,8 +195,9 @@ public void synSend(Object msgObj) throws MQException { /** * 同步发送消息 - * @param tag 消息tag - * @param msgObj 消息体 + * + * @param tag 消息tag + * @param msgObj 消息体 * @throws MQException */ public void synSend(String tag, Object msgObj) throws MQException { @@ -201,17 +206,18 @@ public void synSend(String tag, Object msgObj) throws MQException { /** * 同步发送消息 - * @param topic topic - * @param tag tag + * + * @param topic topic + * @param tag tag * @param msgObj 消息体 - * @param hashKey 用于hash后选择queue的key + * @param hashKey 用于hash后选择queue的key * @throws MQException */ public void synSendOrderly(String topic, String tag, Object msgObj, String hashKey) throws MQException { - if(null == msgObj) { + if (null == msgObj) { return; } - if(StringUtils.isEmpty(hashKey)) { + if (StringUtils.isEmpty(hashKey)) { // fall back to normal synSend(topic, tag, msgObj); } @@ -227,9 +233,10 @@ public void synSendOrderly(String topic, String tag, Object msgObj, String hashK /** * 异步发送消息带tag - * @param topic topic - * @param tag tag - * @param msgObj msgObj + * + * @param topic topic + * @param tag tag + * @param msgObj msgObj * @param sendCallback 回调 * @throws MQException */ @@ -248,7 +255,8 @@ public void asynSend(String topic, String tag, Object msgObj, SendCallback sendC /** * 异步发送消息不带tag和topic - * @param msgObj msgObj + * + * @param msgObj msgObj * @param sendCallback 回调 * @throws MQException */ @@ -258,8 +266,9 @@ public void asynSend(Object msgObj, SendCallback sendCallback) throws MQExceptio /** * 异步发送消息不带tag和topic - * @param tag msgtag - * @param msgObj msgObj + * + * @param tag msgtag + * @param msgObj msgObj * @param sendCallback 回调 * @throws MQException */ @@ -269,18 +278,19 @@ public void asynSend(String tag, Object msgObj, SendCallback sendCallback) throw /** * 异步发送消息带tag - * @param topic topic - * @param tag tag - * @param msgObj msgObj + * + * @param topic topic + * @param tag tag + * @param msgObj msgObj * @param sendCallback 回调 - * @param hashKey 用于hash后选择queue的key + * @param hashKey 用于hash后选择queue的key * @throws MQException */ public void asynSend(String topic, String tag, Object msgObj, SendCallback sendCallback, String hashKey) throws MQException { if (null == msgObj) { return; } - if(StringUtils.isEmpty(hashKey)) { + if (StringUtils.isEmpty(hashKey)) { // fall back to normal asynSend(topic, tag, msgObj, sendCallback); } @@ -296,12 +306,12 @@ public void asynSend(String topic, String tag, Object msgObj, SendCallback sendC /** * 兼容buick中的方式 * - * @deprecated * @param msgObj * @throws MQException + * @deprecated please use more specific method */ public void sendMessage(Object msgObj) throws MQException { - if(StringUtils.isEmpty(getTopic())) { + if (StringUtils.isEmpty(getTopic())) { throw new MQException("如果用这种方式发送消息,请在实例中重写 getTopic() 方法返回需要发送的topic"); } sendOneWay("", "", msgObj); @@ -310,7 +320,7 @@ public void sendMessage(Object msgObj) throws MQException { /** * 重写此方法处理发送后的逻辑 * - * @param sendResult 发送结果 + * @param sendResult 发送结果 */ public void doAfterSynSend(SendResult sendResult) {} } \ No newline at end of file diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPullConsumer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPullConsumer.java new file mode 100644 index 0000000..69d303d --- /dev/null +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPullConsumer.java @@ -0,0 +1,151 @@ +package com.maihaoche.starter.mq.base; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.List; +import java.util.Set; + +/** + * Created by yipin on 2017/6/27. + * RocketMQ的消费者(Push模式)处理消息的接口 + */ +@Slf4j +public abstract class AbstractMQPullConsumer { + + public AbstractMQPullConsumer() { + } + + private String topic; + + private DefaultMQPullConsumer consumer; + + public DefaultMQPullConsumer getConsumer() { + return consumer; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setConsumer(DefaultMQPullConsumer consumer) { + this.consumer = consumer; + } + + public void startInner() { + new Thread(() -> { + try { + while(true) { + Set mqs = consumer.fetchMessageQueuesInBalance(topic); + try { + for (MessageQueue mq : mqs) { + SINGLE_MQ: + while (true) { + try {//阻塞的拉去消息,中止时间默认20s + long offset = consumer.fetchConsumeOffset(mq, false); + offset = offset < 0 ? 0 : offset; + PullResult pullResult = consumer.pull(mq, null, offset, 10); + switch (pullResult.getPullStatus()) { + case FOUND://pullSataus + dealMessage(pullResult.getMsgFoundList()); + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); + } catch (Exception e) { + log.error("consume message fail , e : {}", e); + } + } + } + } catch (Exception e) { + log.error("start pull consumer fail, e : {}", e.getMessage()); + } + } + } catch (Exception e) { + log.error("start pull consumer fail, e : {}", e.getMessage()); + } + }).start(); + } + + + private static Gson gson = new Gson(); + + /** + * 继承这个方法处理消息 + * + * @param message 消息范型 + * @return + */ + public abstract void process(T message); + + /** + * 原生dealMessage方法,可以重写此方法自定义序列化和返回消费成功的相关逻辑 + * + * @param list 消息列表 + * @return + */ + public void dealMessage(List list) { + for(MessageExt messageExt : list) { + if(messageExt.getReconsumeTimes() != 0) { + log.info("re-consume times: {}" , messageExt.getReconsumeTimes()); + } + log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags()); + T t = parseMessage(messageExt); + process(t); + } + } + + /** + * 反序列化解析消息 + * + * @param message 消息体 + * @return + */ + private T parseMessage(MessageExt message) { + if (message == null || message.getBody() == null) { + return null; + } + final Type type = this.getMessageType(); + if (type instanceof Class) { + try { + Object data = gson.fromJson(new String(message.getBody()), type); + return (T) data; + } catch (JsonSyntaxException e) { + log.error("parse message json fail : {}", e.getMessage()); + } + } else { + log.warn("Parse msg error. {}", message); + } + return null; + } + + /** + * 解析消息类型 + * + * @return + */ + private Type getMessageType() { + Type superType = this.getClass().getGenericSuperclass(); + if (superType instanceof ParameterizedType) { + return ((ParameterizedType) superType).getActualTypeArguments()[0]; + } else { + // 如果没有定义泛型,解析为Object + return Object.class; +// throw new RuntimeException("Unkown parameterized type."); + } + } +} diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java index 6f4cbea..97aeab7 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java @@ -1,9 +1,15 @@ package com.maihaoche.starter.mq.base; import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.common.message.MessageExt; import java.lang.reflect.ParameterizedType; @@ -17,6 +23,10 @@ @Slf4j public abstract class AbstractMQPushConsumer { + @Getter + @Setter + private DefaultMQPushConsumer consumer; + public AbstractMQPushConsumer() { } @@ -52,6 +62,28 @@ public ConsumeConcurrentlyStatus dealMessage(List list, ConsumeConcu return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } + /** + * 原生dealMessage方法,可以重写此方法自定义序列化和返回消费成功的相关逻辑 + * + * @param list 消息列表 + * @param consumeOrderlyContext 上下文 + * @return + */ + public ConsumeOrderlyStatus dealMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { + for(MessageExt messageExt : list) { + if(messageExt.getReconsumeTimes() != 0) { + log.info("re-consume times: {}" , messageExt.getReconsumeTimes()); + } + log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags()); + T t = parseMessage(messageExt); + if( null != t && !process(t)) { + log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId()); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + return ConsumeOrderlyStatus.SUCCESS; + } + /** * 反序列化解析消息 * @@ -64,12 +96,16 @@ private T parseMessage(MessageExt message) { } final Type type = this.getMessageType(); if (type instanceof Class) { - Object data = gson.fromJson(new String(message.getBody()), type); - return (T) data; + try { + Object data = gson.fromJson(new String(message.getBody()), type); + return (T) data; + } catch (JsonSyntaxException e) { + log.error("parse message json fail : {}", e.getMessage()); + } } else { log.warn("Parse msg error. {}", message); - return null; } + return null; } /** diff --git a/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java b/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java index 14ecbf7..c5e495f 100644 --- a/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java +++ b/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java @@ -1,11 +1,14 @@ package com.maihaoche.starter.mq.config; import com.maihaoche.starter.mq.annotation.MQConsumer; +import com.maihaoche.starter.mq.base.AbstractMQPullConsumer; import com.maihaoche.starter.mq.base.AbstractMQPushConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -34,12 +37,20 @@ public void init() throws Exception { private void publishConsumer(String beanName, Object bean) throws Exception { MQConsumer mqConsumer = applicationContext.findAnnotationOnBean(beanName, MQConsumer.class); + if(StringUtils.isEmpty(mqProperties.getNameServerAddress())) { + throw new RuntimeException("name server address must be defined"); + } if(StringUtils.isEmpty(mqConsumer.consumerGroup())) { throw new RuntimeException("consumer's consumerGroup must be defined"); } if(StringUtils.isEmpty(mqConsumer.topic())) { throw new RuntimeException("consumer's topic must be defined"); } + if(!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass()) + && !AbstractMQPullConsumer.class.isAssignableFrom(bean.getClass())) { + throw new RuntimeException(bean.getClass().getName() + " - consumer未实现Consumer抽象类"); + } + String consumerGroup = applicationContext.getEnvironment().getProperty(mqConsumer.consumerGroup()); if(StringUtils.isEmpty(consumerGroup)) { consumerGroup = mqConsumer.consumerGroup(); @@ -48,19 +59,43 @@ private void publishConsumer(String beanName, Object bean) throws Exception { if(StringUtils.isEmpty(topic)) { topic = mqConsumer.topic(); } - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); - consumer.setNamesrvAddr(mqProperties.getNameServerAddress()); - consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode())); - consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(),"||")); - consumer.setInstanceName(UUID.randomUUID().toString()); - consumer.registerMessageListener((List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> { - if(!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) { - throw new RuntimeException(bean.getClass().getName() + " - consumer未实现IMQPushConsumer接口"); - } + + // 配置push consumer + if(AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setNamesrvAddr(mqProperties.getNameServerAddress()); + consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode())); + consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(),"||")); + consumer.setInstanceName(UUID.randomUUID().toString()); AbstractMQPushConsumer abstractMQPushConsumer = (AbstractMQPushConsumer) bean; - return abstractMQPushConsumer.dealMessage(list, consumeConcurrentlyContext); - }); - consumer.start(); + if(mqConsumer.consumeMode().equals("CONCURRENTLY")) { + consumer.registerMessageListener((List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> + abstractMQPushConsumer.dealMessage(list, consumeConcurrentlyContext)); + } else if(mqConsumer.consumeMode().equals("ORDERLY")) { + consumer.registerMessageListener((List list, ConsumeOrderlyContext consumeOrderlyContext) -> + abstractMQPushConsumer.dealMessage(list, consumeOrderlyContext)); + } else { + throw new RuntimeException("unknown consume mode ! only support CONCURRENTLY and ORDERLY"); + } + abstractMQPushConsumer.setConsumer(consumer); + consumer.start(); + } else if (AbstractMQPullConsumer.class.isAssignableFrom(bean.getClass())) { + + // 配置pull consumer + + AbstractMQPullConsumer abstractMQPullConsumer = AbstractMQPullConsumer.class.cast(bean); + + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setNamesrvAddr(mqProperties.getNameServerAddress()); + consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode())); + consumer.setInstanceName(UUID.randomUUID().toString()); + consumer.start(); + + abstractMQPullConsumer.setTopic(topic); + abstractMQPullConsumer.setConsumer(consumer); + abstractMQPullConsumer.startInner(); + } + log.info(String.format("%s is ready to subscribe message", bean.getClass().getName())); } } \ No newline at end of file