From 6dfa2b6161495624976cb74cbd02230af37b8e52 Mon Sep 17 00:00:00 2001 From: pufang Date: Wed, 2 Aug 2017 10:28:21 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=B6=88=E6=81=AF=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=A4=9Atag=E6=94=AF=E6=8C=81=202.=20=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87hash=E9=80=89=E6=8B=A9queue?= =?UTF-8?q?=E8=BE=BE=E5=88=B0=E6=9C=89=E5=BA=8F=E6=95=88=E6=9E=9C=203.=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=B9=BF=E6=92=AD=E6=B6=88=E8=B4=B9=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 +- .../starter/mq/annotation/MQConsumer.java | 3 +- .../starter/mq/base/AbstractMQProducer.java | 121 +++++++++++++++++- .../mq/base/AbstractMQPushConsumer.java | 9 +- .../config/MQConsumerAutoConfiguration.java | 8 +- .../config/MQProducerAutoConfiguration.java | 6 +- 6 files changed, 139 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index c2cdb63..1bdae0a 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,7 @@ spring boot starter for RocketMQ -

Maven Central - -GitHub release +

Maven CentralGitHub release 首先添加maven依赖: diff --git a/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java b/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java index 9b814e1..bb47d61 100644 --- a/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java +++ b/src/main/java/com/maihaoche/starter/mq/annotation/MQConsumer.java @@ -15,5 +15,6 @@ public @interface MQConsumer { String consumerGroup(); String topic(); - String tag() default "*"; + String messageMode() default "CLUSTERING"; + String[] tag() default {"*"}; } diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java index f35a261..06ff43f 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQProducer.java @@ -6,8 +6,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; import org.apache.rocketmq.common.message.Message; import javax.annotation.PreDestroy; @@ -22,6 +24,8 @@ public abstract class AbstractMQProducer { private static Gson gson = new Gson(); + private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); + public AbstractMQProducer() { } @@ -97,7 +101,11 @@ private Message genMessage(String topic, String tag, Object msgObj) { */ public void sendOneWay(String topic, String tag, Object msgObj) throws MQException { try { + if(null == msgObj) { + return; + } producer.sendOneway(genMessage(topic, tag, msgObj)); + log.info("send onway message success : {}", msgObj); } catch (Exception e) { log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj); throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage()); @@ -114,6 +122,43 @@ public void sendOneWay(Object msgObj) throws MQException { sendOneWay("", "", msgObj); } + /** + * fire and forget 不关心消息是否送达,可以提高发送tps + * + * @param tag + * @param msgObj + * @throws MQException + */ + public void sendOneWay(String tag, Object msgObj) throws MQException { + sendOneWay("", tag, msgObj); + } + + + /** + * 可以保证同一个queue有序 + * + * @param topic + * @param tag + * @param msgObj + * @param hashKey 用于hash后选择queue的key + */ + public void sendOneWayOrderly(String topic, String tag, Object msgObj, String hashKey) { + if(null == msgObj) { + return; + } + if(StringUtils.isEmpty(hashKey)) { + // fall back to normal + sendOneWay(topic, tag, msgObj); + } + try { + producer.sendOneway(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey); + log.info("send onway message orderly success : {}", msgObj); + } catch (Exception e) { + log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj); + throw new MQException("顺序消息发送失败,topic :" + topic + ",e:" + e.getMessage()); + } + } + /** * 同步发送消息 * @param topic topic @@ -144,6 +189,42 @@ public void synSend(Object msgObj) throws MQException { synSend("", "", msgObj); } + /** + * 同步发送消息 + * @param tag 消息tag + * @param msgObj 消息体 + * @throws MQException + */ + public void synSend(String tag, Object msgObj) throws MQException { + synSend("", tag, msgObj); + } + + /** + * 同步发送消息 + * @param topic topic + * @param tag tag + * @param msgObj 消息体 + * @param hashKey 用于hash后选择queue的key + * @throws MQException + */ + public void synSendOrderly(String topic, String tag, Object msgObj, String hashKey) throws MQException { + if(null == msgObj) { + return; + } + if(StringUtils.isEmpty(hashKey)) { + // fall back to normal + synSend(topic, tag, msgObj); + } + try { + SendResult sendResult = producer.send(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey); + log.info("send rocketmq message orderly ,messageId : {}", sendResult.getMsgId()); + this.doAfterSynSend(sendResult); + } catch (Exception e) { + log.error("顺序消息发送失败,topic : {}, msgObj {}", topic, msgObj); + throw new MQException("顺序消息发送失败,topic :" + topic + ",e:" + e.getMessage()); + } + } + /** * 异步发送消息带tag * @param topic topic @@ -175,9 +256,47 @@ public void asynSend(Object msgObj, SendCallback sendCallback) throws MQExceptio asynSend("", "", msgObj, sendCallback); } + /** + * 异步发送消息不带tag和topic + * @param tag msgtag + * @param msgObj msgObj + * @param sendCallback 回调 + * @throws MQException + */ + public void asynSend(String tag, Object msgObj, SendCallback sendCallback) throws MQException { + asynSend("", tag, msgObj, sendCallback); + } + + /** + * 异步发送消息带tag + * @param topic topic + * @param tag tag + * @param msgObj msgObj + * @param sendCallback 回调 + * @param hashKey 用于hash后选择queue的key + * @throws MQException + */ + public void asynSend(String topic, String tag, Object msgObj, SendCallback sendCallback, String hashKey) throws MQException { + if (null == msgObj) { + return; + } + if(StringUtils.isEmpty(hashKey)) { + // fall back to normal + asynSend(topic, tag, msgObj, sendCallback); + } + try { + producer.send(genMessage(topic, tag, msgObj), messageQueueSelector, hashKey, sendCallback); + log.info("send rocketmq message asyn"); + } catch (Exception e) { + log.error("消息发送失败,topic : {}, msgObj {}", topic, msgObj); + throw new MQException("消息发送失败,topic :" + topic + ",e:" + e.getMessage()); + } + } + /** * 兼容buick中的方式 * + * @deprecated * @param msgObj * @throws MQException */ @@ -194,4 +313,4 @@ public void sendMessage(Object msgObj) throws MQException { * @param sendResult 发送结果 */ public void doAfterSynSend(SendResult sendResult) {} -} +} \ No newline at end of file diff --git a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java index f305e3f..6f4cbea 100644 --- a/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java +++ b/src/main/java/com/maihaoche/starter/mq/base/AbstractMQPushConsumer.java @@ -39,8 +39,13 @@ public AbstractMQPushConsumer() { */ public ConsumeConcurrentlyStatus dealMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt messageExt : list) { + if(messageExt.getReconsumeTimes() != 0) { + log.info("re-consume times: {}" , messageExt.getReconsumeTimes()); + } + log.info("receive msgId: {}, tags : {}" , messageExt.getMsgId(), messageExt.getTags()); T t = parseMessage(messageExt); if( null != t && !process(t)) { + log.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } @@ -77,7 +82,9 @@ private Type getMessageType() { if (superType instanceof ParameterizedType) { return ((ParameterizedType) superType).getActualTypeArguments()[0]; } else { - throw new RuntimeException("Unkown parameterized type."); + // 如果没有定义泛型,解析为Object + return Object.class; +// throw new RuntimeException("Unkown parameterized type."); } } } diff --git a/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java b/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java index ea0c6d5..14ecbf7 100644 --- a/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java +++ b/src/main/java/com/maihaoche/starter/mq/config/MQConsumerAutoConfiguration.java @@ -3,12 +3,13 @@ import com.maihaoche.starter.mq.annotation.MQConsumer; import com.maihaoche.starter.mq.base.AbstractMQPushConsumer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; -import org.springframework.util.StringUtils; import javax.annotation.PostConstruct; import java.util.List; @@ -49,7 +50,8 @@ private void publishConsumer(String beanName, Object bean) throws Exception { } DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(mqProperties.getNameServerAddress()); - consumer.subscribe(topic, mqConsumer.tag()); + consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode())); + consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(),"||")); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.registerMessageListener((List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) -> { if(!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) { @@ -61,4 +63,4 @@ private void publishConsumer(String beanName, Object bean) throws Exception { consumer.start(); log.info(String.format("%s is ready to subscribe message", bean.getClass().getName())); } -} +} \ No newline at end of file diff --git a/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java b/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java index 8765a62..7225c02 100644 --- a/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java +++ b/src/main/java/com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.java @@ -25,9 +25,9 @@ public class MQProducerAutoConfiguration extends MQBaseAutoConfiguration { @PostConstruct public void init() throws Exception { if(producer == null) { - if(StringUtils.isEmpty(mqProperties.getProducerGroup())) { - throw new RuntimeException("请在配置文件中指定消息发送方group!"); - } +// if(StringUtils.isEmpty(mqProperties.getProducerGroup())) { +// throw new RuntimeException("请在配置文件中指定消息发送方group!"); +// } producer = new DefaultMQProducer(mqProperties.getProducerGroup()); producer.setNamesrvAddr(mqProperties.getNameServerAddress()); producer.start();