Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于消息幂等的处理 #2

Closed
jaychang9 opened this issue Sep 20, 2017 · 8 comments
Closed

关于消息幂等的处理 #2

jaychang9 opened this issue Sep 20, 2017 · 8 comments
Milestone

Comments

@jaychang9
Copy link

由于rocketmq-spring-boot-starter为我们处理了消息Body的解析工作,但是导致messageExt对象对我们而言是不可见的了,之前是设想使用messageExt.getBodyCRC()来处理消息幂等性。那现在我能想到的是用md5hash(JSON.toJSONString(message)),将该哈希值作为key存入redis,并设置有效时间比如10分钟。当第一次消费的时候就将该值存到redis;如果有重复消费的情况时,当发现redis中已经有存在该key则忽略此条消息,来防止重复消费

想问下作者,你们是如何处理消息幂等的?

@suclogger
Copy link
Member

嗯,你说的问题也是我一直在考虑的。
我觉得通过messageExt中的msgId来维护幂等性是成本很高的(比如将msgId或者key存入redis),所以我目前倾向的方式是通过消息key结合消息体中的业务属性做消费的幂等,如果有好的方式也欢迎提出来。

另,我也越发觉得messageExt中的很多属性需要暴露给消费者,比如我之前是不暴露key的,加了之后抽象类的api就发生了一次变化,依赖方就很痛苦了,也不愿意升级,所以需要更好的方式,正在考虑放到一个k-v的集合当中,有好的想法也欢迎加入一起改善。

@jaychang9
Copy link
Author

jaychang9 commented Sep 21, 2017

消息key结合消息体中的业务属性做消费的幂等我觉得也是可以的。

关于“我也越发觉得messageExt中的很多属性需要暴露给消费者”,深有同感哈,我fork的在我司内部版本,就将messageExt的tag也暴露给消费者了。这个我感觉还是很有必要的,因为当我订阅的Topic有多个tag的时候,按照目前的设计,我就不能根据tag来区分不同的业务场景。我只能再多建几个Topic。

还有一个问题我想说下,按照目前的设计,由于consumerGroup,topic都是写死在@MQConsumer注解的。这样一来就必须在每个环境都各自部署一套rocketmq服务,因为如果还是用同一套rocketmq服务,意味着各个环境都在共同使用topic,会导致环境间消费者消费错乱。所以我fork的版本,稍微改造了下,允许在消费者中定义topic,consumerGroup属性(优先使用@MQConsumer里的topic与consumerGroup)。这样我们只要配置不同的topic名称,不同的consumerGroup名称,就能隔离不同环境的消息错乱问题。哎,公司穷没办法哈。。。

使用方法如下:
`@MQConsumer(tag = "A")
public class DemoConsumerA extends AbstractMQPushConsumer{

@Value("${rocketmq.demoConsumerA.topic}")
private String topic;

@Value("${rocketmq.demoConsumerA.consumerGroup}")
private String consumerGroup;

@Override
public boolean process(String messageKey, String tag, Demo message) {
    System.out.println("DemoConsumerA.process "+"messageKey=【"+messageKey+"】,message=【"+message+"】");
    try {
        Thread.sleep(50);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return true;
}

}`

application.yml里定义下就好:

rocketmq:
...
demoConsumerA:
topic: TP_DEMO
consumerGroup: CID_DEMO_A

对于生产者还好,因为发送消息的时候,可以指定topic。这是我fork的,经过修改后我们公司使用的版本
https://github.com/jaychang9/qh-rocketmq-spring-boot-starter 本来想发个pull request的,但我觉得可能这个特性并不是大家所需要的。

@suclogger
Copy link
Member

@jaychang9
topic和consumerGroup是支持配置项的,具体实现在 代码 MQConsumerAutoConfiguration中:

        String consumerGroup = applicationContext.getEnvironment().getProperty(mqConsumer.consumerGroup());
        if(StringUtils.isEmpty(consumerGroup)) {
            consumerGroup = mqConsumer.consumerGroup();
        }
        String topic = applicationContext.getEnvironment().getProperty(mqConsumer.topic());
        if(StringUtils.isEmpty(topic)) {
            topic = mqConsumer.topic();
        }

会尝试先将注解中的内容解析为一个配置项。

ps.我个人不是很倾向大力推广tag的方式,因为调用方可能无意中使用了同一个consumerGroup来监听同一个topic的多个tag,因为rocketmq的特性就会导致消息丢失或者重复,你在使用的时候也可以注意一下。

还有在我们实际应用场景中,还是做了不同环境的物理隔离,即不同环境部署不同的mq实例,这样可以最大程度避免消息不知道被谁消费掉的问题。

@jaychang9
Copy link
Author

非常感谢作者能及时回复,这个可以有,那比如我的Consumer类定义的注解是@MQConsumer(consumerGroup="CID_DEMO",topic="TP_DEMO") 如果我想覆盖注解中的consumerGroup topic配置的话,是不是加下启动参数,或者是配置一个环境变量?

比如 jar -jar xxx.jar -DCID_DEMO=CID_DEMO_DEV -DTP_DEMO=TP_DEMO_DEV

@suclogger
Copy link
Member

@jaychang9
master分支的最新代码中,使用了extMap 来存放messageExt中的属性和message.properties中的属性,一起帮忙看看这样实现合适不。

@suclogger
Copy link
Member

关于消息幂等的处理还是决定交给业务方通过extMap中的属性来自行处理

@suclogger suclogger added this to the 0.0.5 milestone Nov 9, 2017
@jaychang9
Copy link
Author

好的,我去看看,这几天没看github

@jaychang9
Copy link
Author

看了下哈,我感觉还不如直接把MessageExt暴露出来给使用者。。。用的时候还得从map里根据key取出来,而且还得强转类型。因为存的时候值是Object类型。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants