-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.html
3209 lines (2393 loc) · 276 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<!--[if IEMobile 7 ]><html class="no-js iem7"><![endif]-->
<!--[if lt IE 9]><html class="no-js lte-ie8"><![endif]-->
<!--[if (gt IE 8)|(gt IEMobile 7)|!(IEMobile)|!(IE)]><!--><html class="no-js" lang="en"><!--<![endif]-->
<head>
<meta charset="utf-8">
<title>薛定谔的风口猪</title>
<meta name="author" content="Jaskey Lam">
<meta name="description" content="Jaskey的个人博客">
<meta name="keywords" content="Java, JavaScript, js,git, css#">
<!-- http://t.co/dKP3o1e -->
<meta name="HandheldFriendly" content="True">
<meta name="MobileOptimized" content="320">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="canonical" href="https://Jaskey.github.io">
<link href="/favicon.png" rel="icon">
<link href="/stylesheets/screen.css" media="screen, projection" rel="stylesheet" type="text/css">
<link href="/atom.xml" rel="alternate" title="薛定谔的风口猪" type="application/atom+xml">
<script src="/javascripts/modernizr-2.0.js"></script>
<script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
<script>!window.jQuery && document.write(unescape('%3Cscript src="./javascripts/libs/jquery.min.js"%3E%3C/script%3E'))</script>
<script src="/javascripts/octopress.js" type="text/javascript"></script>
<!--linkedin source-->
<script type="text/javascript" src="https://platform.linkedin.com/badges/js/profile.js" async defer></script>
<!--Fonts from Google"s Web font directory at http://google.com/webfonts -->
<link href="//fonts.googleapis.com/css?family=PT+Serif:regular,italic,bold,bolditalic" rel="stylesheet" type="text/css">
<link href="//fonts.googleapis.com/css?family=PT+Sans:regular,italic,bold,bolditalic" rel="stylesheet" type="text/css">
</head>
<body >
<header role="banner"><hgroup>
<h1><a href="/">薛定谔的风口猪</a></h1>
<h2>站在巨人的肩膀上学习,猪都能看得很远</h2>
</hgroup>
</header>
<nav role="navigation"><ul class="subscription" data-subscription="rss">
<li><a href="/atom.xml" rel="subscribe-rss" title="subscribe via RSS">RSS</a></li>
</ul>
<form action="https://www.google.com/search" method="get">
<fieldset role="search">
<input type="hidden" name="q" value="site:Jaskey.github.io" />
<input class="search" type="text" name="q" results="0" placeholder="Search"/>
</fieldset>
</form>
<ul class="main-navigation"><!--导航栏-->
<li><a href="/">主页</a></li>
<li><a href="/blog/archives/">所有博文</a></li>
<!-- <li><a href="/about" target="_blank">关于作者</a></li> –> <!--about 文件夹下的index.html-->
<li><a href="http://www.zhihu.com/people/linjunjie1103/answers?order_by=vote_num" target="_blank" >知乎主页</a></li><!--跳转到知乎主页-->
</ul>
</nav>
<div id="main">
<div id="content">
<div class="blog-index">
<article>
<header>
<h1 class="entry-title"><a href="/blog/2022/04/14/cache-consistency/">挑战大型系统的缓存设计——应对一致性问题</a></h1>
<p class="meta">
<time class='entry-date' datetime='2022-04-14T17:03:26+08:00'><span class='date'>2022-04-14 Thu</span> <span class='time'>17:03</span></time>
</p>
</header>
<div class="entry-content"><p>在真实的业务场景中,我们业务的数据——例如订单、会员、支付等——都是持久化到数据库中的,因为数据库能有很好的事务保证、持久化保证。但是,正因为数据库要能够满足这么多优秀的功能特性,使得数据库在设计上通常难以兼顾到性能,因此往往不能满足大型流量下的性能要求,像是 MySQL 数据库只能承担“千”这个级别的 QPS,否则很可能会不稳定,进而导致整个系统的故障。</p>
<p>但是客观上,我们的业务规模很可能要求着更高的 QPS,有些业务的规模本身就非常大,也有些业务会遇到一些流量高峰,比如电商会遇到大促的情况。</p>
<p>而这时候大部分的流量实际上都是<strong>读请求</strong>,而且大部分数据也是没有那么多变化的,如热门商品信息、微博的内容等常见数据就是如此。此时,<strong>缓存就是我们应对此类场景的利器</strong>。</p>
<h2>缓存的意义</h2>
<p>所谓缓存,实际上就是用空间换时间,准确地说是用<strong>更高速的空间来换时间</strong>,从而<strong>整体</strong><strong>上</strong><strong>提升读的性能</strong>。</p>
<p>何为更高速的空间呢?</p>
<ol>
<li>更快的存储介质。通常情况下,如果说数据库的速度慢,就得用更快的存储介质去替代它,目前最常见的就是Redis。Redis 单实例的读 QPS 可以高达 10w/s,90% 的场景下只需要正确使用 Redis 就能应对。</li>
<li>就近使用本地内存。就像 CPU 也有高速缓存一样,缓存也可以分为一级缓存、二级缓存。即便 Redis 本身性能已经足够高了,但访问一次 Redis 毕竟也需要一次网络 IO,而使用本地内存无疑有更快的速度。不过单机的内存是十分有限的,所以这种一级缓存只能存储非常少量的数据,通常是最热点的那些 key 对应的数据。这就相当于额外消耗宝贵的服务内存去换取高速的读取性能。</li>
</ol>
<h2>引入缓存后的一致性挑战</h2>
<p>用空间换时间,意味着数据同时存在于多个空间。最常见的场景就是数据同时存在于 Redis 与 MySQL 上(为了问题的普适性,后面举例中若没有特别说明,缓存均指 Redis 缓存)。</p>
<p>实际上,最权威最全的数据还是在 MySQL 里的,只要 Redis 数据没有得到及时的更新而导致最新数据没有同步到 Redis 中,就出现了数据不一致。</p>
<p>大部分情况下,只要使用了缓存,就必然会有不一致的情况出现,只是说这个不一致的时间窗口是否能做到足够的小。有些不合理的设计可能会导致数据持续不一致,这是我们需要改善设计去避免的。</p>
<h2>缓存不一致性无法客观地完全消灭</h2>
<p>为什么我们几乎没办法做到缓存和数据库之间的强一致呢?</p>
<p>正常情况下,我们需要在数据库更新完后,把对应的最新数据同步到缓存中,以便在读请求的时候,能读到新的数据而不是旧的数据(脏数据)。但是很可惜,由于数据库和 Redis 之间是没有事务保证的,所以我们无法确保写入数据库成功后,写入 Redis 也是一定成功的;即便 Redis 写入能成功,在数据库写入成功后到 Redis 写入成功前的这段时间里,Redis 数据也肯定是和 MySQL 不一致的。如下图:</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-1.png" title="image-1" alt="图片" /></p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-2.png" title="image-2" alt="图片" />
所以说这个时间窗口是没办法完全消灭的,除非我们付出极大的代价,使用分布式事务等各种手段去维持强一致,但是这样会使得系统的整体性能大幅度下降,甚至比不用缓存还慢,这样不就与我们使用缓存的目标背道而驰了吗?</p>
<p>不过虽然无法做到强一致,但是我们能做到的是缓存与数据库达到最终一致,而且不一致的时间窗口我们能做到尽可能短,按照经验来说,如果能将时间优化到 1ms 之内,这个一致性问题带来的影响我们就可以忽略不计。</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-3.png" title="image-3" alt="图片" /></p>
<h2>更新缓存的手段</h2>
<p>通常情况下,我们在处理查询请求的时候,使用缓存的逻辑如下:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>data = queryDataRedis(key);
</span><span class='line'>if (data ==null) {
</span><span class='line'> data = queryDataMySQL(key); //缓存查询不到,从MySQL做查询
</span><span class='line'> if (data!=null) {
</span><span class='line'> updateRedis(key, data);//查询完数据后更新到MySQL
</span><span class='line'> }
</span><span class='line'>}</span></code></pre></td></tr></table></div></figure>
<p>也就是说<strong>优先查询缓存,查询不到才查询数据库</strong>。如果这时候数据库查到数据了,就将缓存的数据进行更新。
这样的逻辑是正确的,而一致性的问题一般不来源于此,而是出现在处理<strong>写请求</strong>的时候。所以我们简化成最简单的写请求的逻辑,此时你可能会面临多个选择,究竟是直接更新缓存,还是失效缓存?而无论是更新缓存还是失效缓存,都可以选择在更新数据库之前,还是之后操作。</p>
<p>这样就演变出 4 个策略:<strong>更新数据库后更新缓存、更新数据库前更新缓存、更新数据库后删除缓存、更新数据库前删除缓存</strong><strong>。</strong>下面我们来分别讲述。</p>
<h3>更新数据库后更新缓存的不一致问题</h3>
<p>一种常见的操作是,设置一个过期时间,让写请求以数据库为准,过期后,读请求同步数据库中的最新数据给缓存。那么在加入了过期时间后,是否就不会有问题了呢?并不是这样。</p>
<p>大家设想一下这样的场景。</p>
<p>假如这里有一个计数器,把数据库自减 1,原始数据库数据是 100,同时有两个写请求申请计数减一,假设线程 A 先减数据库成功,线程 B 后减数据库成功。那么这时候数据库的值是 98,缓存里正确的值应该也要是 98。</p>
<p>但是特殊场景下,你可能会遇到这样的情况:</p>
<ol>
<li>线程 A 和线程 B 同时更新这个数据</li>
<li>更新数据库的顺序是先 A 后 B</li>
<li>更新缓存时顺序是先 B 后 A
如果我们的代码逻辑还是更新数据库后立刻更新缓存的数据,那么——</li>
</ol>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>updateMySQL();
</span><span class='line'>updateRedis(key, data);</span></code></pre></td></tr></table></div></figure>
<p>就可能出现:数据库的值是 100->99->98,但是缓存的数据却是 100->98->99,也就是数据库与缓存的不一致。而且这个不一致只能等到下一次数据库更新或者缓存失效才可能修复。</p>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程B(写请求)</strong> </th>
<th style="text-align:left;"> <strong>问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T1 </td>
<td style="text-align:left;"> 更新数据库为99 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T2 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 更新数据库为98 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T3 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 更新缓存数据为98 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T4 </td>
<td style="text-align:left;"> 更新缓存数据为99 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 此时缓存的值被显式更新为99,但是实际上数据库的值已经是98,数据不一致 </td>
</tr>
</tbody>
</table>
<h3>更新数据库前更新缓存的不一致问题</h3>
<p>那你可能会想,这是否表示,我应该先让缓存更新,之后再去更新数据库呢?类似这样:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>updateRedis(key, data);//先更新缓存
</span><span class='line'>updateMySQL();//再更新数据库</span></code></pre></td></tr></table></div></figure>
<p>这样操作产生的问题更是显而易见的,因为我们无法保证数据库的更新成功,万一数据库更新失败了,你缓存的数据就不只是脏数据,而是错误数据了。
你可能会想,是否我在更新数据库失败的时候做 Redis 回滚的操作能够解决呢?这其实也是不靠谱的,因为我们也不能保证这个回滚的操作 100% 被成功执行。</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-4.png" title="image-4" alt="图片" /></p>
<p>同时,在写写并发的场景下,同样有类似的一致性问题,请看以下情况:</p>
<ol>
<li>线程 A 和线程 B 同时更新同这个数据</li>
<li>更新缓存的顺序是先 A 后 B</li>
<li>更新数据库的顺序是先 B 后 A
举个例子。线程 A 希望把计数器置为 0,线程 B 希望置为 1。而按照以上场景,缓存确实被设置为 1,但数据库却被设置为 0。</li>
</ol>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程B(写请求)</strong> </th>
<th style="text-align:left;"> <strong>问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T1 </td>
<td style="text-align:left;"> 更新缓存为0 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T2 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 更新缓存为1 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T3 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 更新数据库为1 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T4 </td>
<td style="text-align:left;"> 更新数据库数据为0 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 此时缓存的值被显式更新为1,但是实际上数据库的值是0,数据不一致 </td>
</tr>
</tbody>
</table>
<p>所以<strong>通常情况下,更新缓存再更新数据库是我们</strong><strong>应该避免使用</strong><strong>的</strong><strong>一种</strong><strong>手段</strong>。</p>
<h3>更新数据库前删除缓存的问题</h3>
<p>那如果采取删除缓存的策略呢?也就是说我们在更新数据库的时候失效对应的缓存,让缓存在下次触发读请求时进行更新,是否会更好呢?同样地,针对在更新数据库前和数据库后这两个删除时机,我们来比较下其差异。</p>
<p>最直观的做法,我们可能会先让缓存失效,然后去更新数据库,代码逻辑如下:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>deleteRedis(key);//先删除缓存让缓存失效
</span><span class='line'>updateMySQL();//再更新数据库</span></code></pre></td></tr></table></div></figure>
<p>这样的逻辑看似没有问题,毕竟删除缓存后即便数据库更新失败了,也只是缓存上没有数据而已。然后并发两个写请求过来,无论怎么样的执行顺序,缓存最后的值也都是会被删除的,也就是说在并发写写的请求下这样的处理是没问题的。
然而,这种处理在读写并发的场景下却存在着隐患。</p>
<p>还是刚刚更新计数的例子。例如现在缓存的数据是 100,数据库也是 100,这时候需要对此计数减 1,减成功后,数据库应该是 99。如果这之后触发读请求,缓存如果有效的话,里面应该也要被更新为 99 才是正确的。</p>
<p>那么思考下这样的请求情况:</p>
<ol>
<li>线程 A 更新这个数据的同时,线程 B 读取这个数据</li>
<li>线程 A 成功删除了缓存里的老数据,这时候线程 B 查询数据发现缓存失效</li>
<li>线程 A 更新数据库成功</li>
</ol>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程B(读请求)</strong> </th>
<th style="text-align:left;"> <strong>问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T1 </td>
<td style="text-align:left;"> 删除缓存值 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T2 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 1.读取缓存数据,缓存缺失,从数据库读取数据100 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T3 </td>
<td style="text-align:left;"> 更新数据库中的数据X的值为99 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T4 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 将数据100的值写入缓存 </td>
<td style="text-align:left;"> 此时缓存的值被显式更新为100,但是实际上数据库的值已经是99了 </td>
</tr>
</tbody>
</table>
<p>可以看到,在读写并发的场景下,一样会有不一致的问题。</p>
<p>针对这种场景,有个做法是所谓的“<strong>延迟双删策略</strong>”,就是说,既然可能因为读请求把一个旧的值又写回去,那么我在写请求处理完之后,等到差不多的时间延迟再重新删除这个缓存值。</p>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程C(新的读请求)</strong> </th>
<th style="text-align:left;"> <strong>线程D(新的读请求)</strong> </th>
<th style="text-align:left;"> <strong>问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T5 </td>
<td style="text-align:left;"> sleep(N) </td>
<td style="text-align:left;"> 缓存存在,读取到缓存旧值100 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 其他线程可能在双删成功前读到脏数据 </td>
</tr>
<tr>
<td style="text-align:left;"> T6 </td>
<td style="text-align:left;"> 删除缓存值 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T7 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 缓存缺失,从数据库读取数据的最新值(99) </td>
<td style="text-align:left;"> </td>
</tr>
</tbody>
</table>
<p>这种解决思路的关键在于对 N 的时间的判断,如果 N 时间太短,线程 A 第二次删除缓存的时间依旧早于线程 B 把脏数据写回缓存的时间,那么相当于做了无用功。而 N 如果设置得太长,那么在触发双删之前,新请求看到的都是脏数据。</p>
<h3>更新数据库后删除缓存</h3>
<p>那如果我们把更新数据库放在删除缓存之前呢,问题是否解决?我们继续从读写并发的场景看下去,有没有类似的问题。</p>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程B(读请求)</strong> </th>
<th style="text-align:left;"> <strong>线程C(读请求)</strong> </th>
<th style="text-align:left;"> <strong>潜在问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T1 </td>
<td style="text-align:left;"> 更新主库 X = 99(原值 X = 100) </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T2 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 读取数据,查询到缓存还有数据,返回100 </td>
<td style="text-align:left;"> 线程C实际上读取到了和数据库不一致的数据 </td>
</tr>
<tr>
<td style="text-align:left;"> T3 </td>
<td style="text-align:left;"> 删除缓存 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T4 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 查询缓存,缓存缺失,查询数据库得到当前值99 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T5 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 将99写入缓存 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
</tbody>
</table>
<p>可以看到,大体上,采取先更新数据库再删除缓存的策略是没有问题的,仅在更新数据库成功到缓存删除之间的时间差内,可能会被别的线程读取到老值。</p>
<p>而在开篇的时候我们说过,缓存不一致性的问题无法在客观上完全消灭,因为我们无法保证数据库和缓存的操作是一个事务里的,而我们能做到的只是尽量缩短不一致的时间窗口。</p>
<p>在更新数据库后删除缓存这个场景下,不一致窗口仅仅是 T2 到 T3 的时间,大概是 1ms 左右,在大部分业务场景下我们都可以忽略不计。</p>
<p>但是真实场景下,还是会有一个情况存在不一致的可能性,这个场景是读线程发现缓存不存在,于是读写并发时,读线程回写进去老值。并发情况如下:</p>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>时间</strong> </th>
<th style="text-align:left;"> <strong>线程A(写请求)</strong> </th>
<th style="text-align:left;"> <strong>线程B(读请求–缓存不存在场景)</strong> </th>
<th style="text-align:left;"> <strong>潜在问题</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> T1 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 查询缓存,缓存缺失,查询数据库得到当前值100 </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T2 </td>
<td style="text-align:left;"> 更新主库 X = 99(原值 X = 100) </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T3 </td>
<td style="text-align:left;"> 删除缓存 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> </td>
</tr>
<tr>
<td style="text-align:left;"> T4 </td>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 将100写入缓存 </td>
<td style="text-align:left;"> 此时缓存的值被显式更新为100,但是实际上数据库的值已经是99了 </td>
</tr>
</tbody>
</table>
<p>总的来说,这个不一致场景出现条件非常严格,因为并发量很大时,缓存不太可能不存在;如果并发很大,而缓存真的不存在,那么很可能是这时的写场景很多,因为写场景会删除缓存。所以待会我们会提到,写场景很多时候实际上并不适合采取删除策略。</p>
<h3>总结四种更新策略</h3>
<p>终上所述,我们对比了四个更新缓存的手段,做一个总结对比,如下图:</p>
<table>
<thead>
<tr>
<th style="text-align:left;"> <strong>策略</strong> </th>
<th style="text-align:left;"> <strong>并发场景</strong> </th>
<th style="text-align:left;"> <strong>潜在问题</strong> </th>
<th style="text-align:left;"> <strong>应对方案</strong> </th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left;"> 更新数据库+更新缓存 </td>
<td style="text-align:left;"> 写+读 </td>
<td style="text-align:left;"> 线程A未更新完缓存之前,线程B的读请求会短暂读到旧值 </td>
<td style="text-align:left;"> 可以忽略 </td>
</tr>
<tr>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 写+写 </td>
<td style="text-align:left;"> 更新数据库的顺序是先A后B,但更新缓存时顺序是先B后A,数据库和缓存数据不一致 </td>
<td style="text-align:left;"> 分布式锁(操作重) </td>
</tr>
<tr>
<td style="text-align:left;"> 更新缓存+更新数据库 </td>
<td style="text-align:left;"> 无并发 </td>
<td style="text-align:left;"> 线程A还未更新完缓存但是更新数据库可能失败 </td>
<td style="text-align:left;"> 利用MQ确认数据库更新成功(较复杂) </td>
</tr>
<tr>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 写+写 </td>
<td style="text-align:left;"> 更新缓存的顺序是先A后B,但更新数据库时顺序是先B后A </td>
<td style="text-align:left;"> 分布式锁(操作很重) </td>
</tr>
<tr>
<td style="text-align:left;"> 删除缓存值+更新数据库 </td>
<td style="text-align:left;"> 写+读 </td>
<td style="text-align:left;"> 写请求的线程A删除了缓存在更新数据库之前,这时候读请求线程B到来,因为缓存缺失,则把当前数据读取出来放到缓存,而后线程A更新成功了数据库 </td>
<td style="text-align:left;"> 延迟双删(但是延迟的时间不好估计,且延迟的过程中依旧有不一致的时间窗口) </td>
</tr>
<tr>
<td style="text-align:left;"> 更新数据库+删除缓存值 </td>
<td style="text-align:left;"> 写+读(缓存命中) </td>
<td style="text-align:left;"> 线程A完成数据库更新成功后,尚未删除缓存,线程B有并发读请求会读到旧的脏数据<br> </td>
<td style="text-align:left;"> 可以忽略 </td>
</tr>
<tr>
<td style="text-align:left;"> </td>
<td style="text-align:left;"> 写+读(缓存不命中) </td>
<td style="text-align:left;"> 读请求不命中缓存,写请求处理完之后读请求才回写缓存,此时缓存不一致 </td>
<td style="text-align:left;"> 分布式锁(操作重) </td>
</tr>
</tbody>
</table>
<p><strong>从一致性的角度来看,采取更新数据库后删除缓存值,是更为适合的策略</strong><strong>。</strong>因为出现不一致的场景的条件更为苛刻,概率相比其他方案更低。</p>
<p>那么是否更新缓存这个策略就一无是处呢?不是的!</p>
<p>删除缓存值意味着对应的 key 会失效,那么这时候读请求都会打到数据库。如果这个数据的写操作非常频繁,就会导致缓存的作用变得非常小。而如果这时候某些 Key 还是非常大的热 key,就可能因为扛不住数据量而导致系统不可用。</p>
<p>如下图所示:</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-5.png" title="删除策略频繁的缓存失效导致读请求无法利用缓存" alt="图片" /></p>
<p>所以做个简单总结,足以适应绝大部分的互联网开发场景的决策:</p>
<ul>
<li><p><strong>针对大部分读多写少场景,建议选择更新数据库后删除缓存的策略。</strong></p></li>
<li><p><strong>针对读写相当或者写多读少的场景,建议选择更新数据库后更新缓存的策略。</strong></p></li>
</ul>
<h2>最终一致性如何保证?</h2>
<h3>缓存设置过期时间</h3>
<p>第一个方法便是我们上面提到的,当我们无法确定 MySQL 更新完成后,缓存的更新/删除一定能成功,例如 Redis 挂了导致写入失败了,或者当时网络出现故障,更常见的是服务当时刚好发生重启了,没有执行这一步的代码。</p>
<p>这些时候 MySQL 的数据就无法刷到 Redis 了。为了避免这种不一致性永久存在,使用缓存的时候,我们必须要给缓存设置一个过期时间,例如 1 分钟,这样即使出现了更新 Redis 失败的极端场景,不一致的时间窗口最多也只是 1 分钟。</p>
<p>这是我们最终一致性的兜底方案,万一出现任何情况的不一致问题,最后都能通过缓存失效后重新查询数据库,然后回写到缓存,来做到缓存与数据库的最终一致。</p>
<h3>如何减少缓存删除/更新的失败?</h3>
<p>万一删除缓存这一步因为服务重启没有执行,或者 Redis 临时不可用导致删除缓存失败了,就会有一个较长的时间(缓存的剩余过期时间)是数据不一致的。</p>
<p>那我们有没有什么手段来减少这种不一致的情况出现呢?这时候借助一个可靠的消息中间件就是一个不错的选择。</p>
<p>因为消息中间件有 ATLEAST-ONCE 的机制,如下图所示。</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-6.png" title="image-6" alt="图片" /></p>
<p>我们把删除 Redis 的请求以消费 MQ 消息的手段去失效对应的 Key 值,如果 Redis 真的存在异常导致无法删除成功,我们依旧可以依靠 MQ 的重试机制来让最终 Redis 对应的 Key 失效。</p>
<p>而你们或许会问,极端场景下,是否存在更新数据库后 MQ 消息没发送成功,或者没机会发送出去机器就重启的情况?</p>
<p>这个场景的确比较麻烦,如果 MQ 使用的是 RocketMQ,我们可以借助 RocketMQ 的事务消息,来让删除缓存的消息最终一定发送出去。而如果你没有使用 RocketMQ,或者你使用的消息中间件并没有事务消息的特性,则可以采取消息表的方式让更新数据库和发送消息一起成功。事实上这个话题比较大了,我们不在这里展开。</p>
<h3>如何处理复杂的多缓存场景?</h3>
<p>有些时候,真实的缓存场景并不是数据库中的一个记录对应一个 Key 这么简单,有可能一个数据库记录的更新会牵扯到多个 Key 的更新。还有另外一个场景是,更新不同的数据库的记录时可能需要更新同一个 Key 值,这常见于一些 App 首页数据的缓存。</p>
<p>我们以一个数据库记录对应多个 Key 的场景来举例。</p>
<p>假如系统设计上我们缓存了一个粉丝的主页信息、主播打赏榜 TOP10 的粉丝、单日 TOP 100 的粉丝等多个信息。如果这个粉丝注销了,或者这个粉丝触发了打赏的行为,上面多个 Key 可能都需要更新。只是一个打赏的记录,你可能就要做:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>updateMySQL();//更新数据库一条记录
</span><span class='line'>deleteRedisKey1();//失效主页信息的缓存
</span><span class='line'>updateRedisKey2();//更新打赏榜TOP10
</span><span class='line'>deleteRedisKey3();//更新单日打赏榜TOP100</span></code></pre></td></tr></table></div></figure>
<p>这就涉及多个 Redis 的操作,每一步都可能失败,影响到后面的更新。甚至从系统设计上,更新数据库可能是单独的一个服务,而这几个不同的 Key 的缓存维护却在不同的 3 个微服务中,这就大大增加了系统的复杂度和提高了缓存操作失败的可能性。最可怕的是,操作更新记录的地方很大概率不只在一个业务逻辑中,而是散发在系统各个零散的位置。
针对这个场景,解决方案和上文提到的保证最终一致性的操作一样,就是把更新缓存的操作以 MQ 消息的方式发送出去,由不同的系统或者专门的一个系统进行订阅,而做聚合的操作。如下图:</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-7.png" title="不同业务系统订阅MQ消息单独维护各自的缓存Key" alt="图片" /></p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-8.png" title="专门更新缓存的服务订阅MQ消息维护所有相关Key的缓存操作" alt="图片" /></p>
<h3>通过订阅 MySQL binlog 的方式处理缓存</h3>
<p>上面讲到的 MQ 处理方式需要业务代码里面显式地发送 MQ 消息。还有一种优雅的方式便是订阅 MySQL 的 binlog,监听数据的真实变化情况以处理相关的缓存。</p>
<p>例如刚刚提到的例子中,如果粉丝又触发打赏了,这时候我们利用 binlog 表监听是能及时发现的,发现后就能集中处理了,而且无论是在什么系统什么位置去更新数据,都能做到集中处理。</p>
<p>目前业界类似的产品有 Canal,具体的操作图如下:</p>
<p><img src="http://jaskey.github.io/images/cache-consistency/image-9.png" title="利用Canel订阅数据库binlog变更从而发出MQ消息,让一个专门消费者服务维护所有相关Key的缓存操作" alt="图片" /></p>
<p>到这里,针对大型系统缓存设计如何保证最终一致性,我们已经从策略、场景、操作方案等角度进行了细致的讲述,这些是我根据多年开发经验进行总结的,希望能对你起到帮助。</p>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/blog/2020/11/26/rocketmq-consumer-allocate/">为什么在一段时间内RocketMQ的队列同时分配给了两个消费者?详细剖析消费者负载均衡中的坑(上)</a></h1>
<p class="meta">
<time class='entry-date' datetime='2020-11-26T15:37:53+08:00'><span class='date'>2020-11-26 Thu</span> <span class='time'>15:37</span></time>
</p>
</header>
<div class="entry-content"><p>之前的文章有提到过,消费者大概是怎么做负载均衡的(集群模式),如下图所示:</p>
<p><img src="https://jaskey.github.io/images/rocketmq/consumer-loadbalance1.png" alt="消费者负载均衡" /></p>
<p>集群模式下,每个消费者实例会被分配到若干条队列。正因为消费者拿到了明确的队列,所以它们才能针对对应的队列做循环拉取消息的处理,以下是消费者客户端和broker通信的部分代码,可以看到通信的参数里有一个重要的参数,就是queueId</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<span class='line-number'>8</span>
<span class='line-number'>9</span>
<span class='line-number'>10</span>
<span class='line-number'>11</span>
<span class='line-number'>12</span>
<span class='line-number'>13</span>
<span class='line-number'>14</span>
<span class='line-number'>15</span>
<span class='line-number'>16</span>
<span class='line-number'>17</span>
<span class='line-number'>18</span>
<span class='line-number'>19</span>
<span class='line-number'>20</span>
<span class='line-number'>21</span>
<span class='line-number'>22</span>
<span class='line-number'>23</span>
<span class='line-number'>24</span>
</pre></td><td class='code'><pre><code class='java'><span class='line'> <span class="n">PullMessageRequestHeader</span> <span class="n">requestHeader</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">PullMessageRequestHeader</span><span class="o">();</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setConsumerGroup</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="na">consumerGroup</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setTopic</span><span class="o">(</span><span class="n">mq</span><span class="o">.</span><span class="na">getTopic</span><span class="o">());</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setQueueId</span><span class="o">(</span><span class="n">mq</span><span class="o">.</span><span class="na">getQueueId</span><span class="o">());</span><span class="c1">//消息拉取必须显示的告诉broker拉取哪个queue的消息</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setQueueOffset</span><span class="o">(</span><span class="n">offset</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setMaxMsgNums</span><span class="o">(</span><span class="n">maxNums</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setSysFlag</span><span class="o">(</span><span class="n">sysFlagInner</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setCommitOffset</span><span class="o">(</span><span class="n">commitOffset</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setSuspendTimeoutMillis</span><span class="o">(</span><span class="n">brokerSuspendMaxTimeMillis</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setSubscription</span><span class="o">(</span><span class="n">subExpression</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setSubVersion</span><span class="o">(</span><span class="n">subVersion</span><span class="o">);</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">.</span><span class="na">setExpressionType</span><span class="o">(</span><span class="n">expressionType</span><span class="o">);</span>
</span><span class='line'>
</span><span class='line'> <span class="n">String</span> <span class="n">brokerAddr</span> <span class="o">=</span> <span class="n">findBrokerResult</span><span class="o">.</span><span class="na">getBrokerAddr</span><span class="o">();</span>
</span><span class='line'> <span class="k">if</span> <span class="o">(</span><span class="n">PullSysFlag</span><span class="o">.</span><span class="na">hasClassFilterFlag</span><span class="o">(</span><span class="n">sysFlagInner</span><span class="o">))</span> <span class="o">{</span>
</span><span class='line'> <span class="n">brokerAddr</span> <span class="o">=</span> <span class="n">computPullFromWhichFilterServer</span><span class="o">(</span><span class="n">mq</span><span class="o">.</span><span class="na">getTopic</span><span class="o">(),</span> <span class="n">brokerAddr</span><span class="o">);</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'>
</span><span class='line'> <span class="n">PullResult</span> <span class="n">pullResult</span> <span class="o">=</span> <span class="k">this</span><span class="o">.</span><span class="na">mQClientFactory</span><span class="o">.</span><span class="na">getMQClientAPIImpl</span><span class="o">().</span><span class="na">pullMessage</span><span class="o">(</span>
</span><span class='line'> <span class="n">brokerAddr</span><span class="o">,</span>
</span><span class='line'> <span class="n">requestHeader</span><span class="o">,</span>
</span><span class='line'> <span class="n">timeoutMillis</span><span class="o">,</span>
</span><span class='line'> <span class="n">communicationMode</span><span class="o">,</span>
</span><span class='line'> <span class="n">pullCallback</span><span class="o">);</span>
</span></code></pre></td></tr></table></div></figure>
<p>这侧面也再次印证,RocketMQ的消费模型是Pull模式。</p>
<p>同时,对于每个消费者实例来说,在每个消息拉取之前,实际上都是确定了队列的(不会轻易发生改变),如下图控制台所示:</p>
<p><img src="https://jaskey.github.io/images/rocketmq/rocketmq-queue-allocation.png" alt="消费者负载均衡控制台示例" /></p>
<p>本文尝试对RocketMQ负载均衡(哪个消费者消费哪些队列)的原理进行解析,希望能让大家对其中的基本原理进行了解,并对部分问题能作出合理解析和正确规避。</p>
<h2>所谓Rebalance到底在解决什么问题</h2>
<p>RocketMQ每次分配队列的过程,代码里叫Relalance,本文在某些场景下也称为重排,实际上是一个负载均衡的过程。之所以说分配队列的过程就是负载均衡的过程的原因是,RocketMQ是负载均衡分配的就是队列,而不是消息。如果这个过程RocketMQ给了较高负载高,其实并不肯定意味着你能接受更多的消息(虽然绝大部分场景你可以这样理解),而只是说我给你分配了更多的队列。为什么说有更多的队列可能并不代表你有更多消息消费呢?</p>
<p>例如我们举一个例子,两个消费者一个消费者实例A获得了1个队列q0,一个消费者实例B获得了两个队列,这个负载均衡的过程分配了给B更多的”负载”(队列),但是假设消费者B获得的两个队列q1 q2中的q2本身是不可写的(topic可以配置读队列数量,写队列数量,所以是可能存在一些队列可读,但是不可写的情况),又或者生产者手动的选择了发送topic的queue目标(利用selector),这个过程从来都不选择q2,只有q0,和q1在做发送,甚至大部分情况下都往q0发,这时候消费者B实例其实都没有真正意义上的更高负载。</p>
<p>总结一下:就是所谓的消费者Rebalance,其实是分配队列的过程,它本质上希望解决的是一个消费者的负载问题,但是实际的工作其并不直接改变一个消费者实例的真实负载(消息),而是间接的决定的——通过管理分配队列的数量。而平时我们绝大部分可以认为队列的负载就是真实的消息负载的原因是基于这样一个前提:消息的分布基本是均匀分配在不同的队列上的,所以在这个前提下,获得了更多的队列实际上就是获得了更多的消息负载。</p>
<h2>Relance具体是如何决定分配的数量的</h2>
<p>RocketMQ的Rebalance实际上是<strong>无中心</strong>的,这和Kafka有本质区别,Kafka虽然也是客户端做的负载均衡,但是Kafka在做负载均衡之前会选定一个Leader,由这个Leader全局把控分配的过程,而后再把每个消费者对partion的分配结果广播给各个消费者。</p>
<p>而RocketMQ实际上没有人做这个统一分配的,而是每个消费者自己”有秩序地”计算出自己应该获取哪些队列,你可能会觉得很神奇,到底为啥大家能如此有秩序而不打架呢?我们下面来看看。</p>
<p>你可能知道RocketMQ是支持很多负载均衡的算法的,甚至还支持用户自己实现一个负载均衡算法。具体的这个分配算法需要实现以下接口:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<span class='line-number'>8</span>
<span class='line-number'>9</span>
<span class='line-number'>10</span>
<span class='line-number'>11</span>
<span class='line-number'>12</span>
<span class='line-number'>13</span>
<span class='line-number'>14</span>
<span class='line-number'>15</span>
<span class='line-number'>16</span>
<span class='line-number'>17</span>
<span class='line-number'>18</span>
</pre></td><td class='code'><pre><code class='java'><span class='line'><span class="cm">/** * Strategy Algorithm for message allocating between consumers */</span><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">AllocateMessageQueueStrategy</span> <span class="o">{</span>
</span><span class='line'>
</span><span class='line'>
</span><span class='line'> <span class="cm">/** </span>
</span><span class='line'><span class="cm"> * Allocating by consumer id </span>
</span><span class='line'><span class="cm"> * </span>
</span><span class='line'><span class="cm"> * @param consumerGroup current consumer group </span>
</span><span class='line'><span class="cm"> * @param currentCID current consumer id </span>
</span><span class='line'><span class="cm"> * @param mqAll message queue set in current topic </span>
</span><span class='line'><span class="cm"> * @param cidAll consumer set in current consumer group </span>
</span><span class='line'><span class="cm"> * @return The allocate result of given strategy */</span>
</span><span class='line'> <span class="n">List</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">></span> <span class="nf">allocate</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">consumerGroup</span><span class="o">,</span><span class="kd">final</span> <span class="n">String</span> <span class="n">currentCID</span><span class="o">,</span> <span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">></span> <span class="n">mqAll</span><span class="o">,</span> <span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">cidAll</span><span class="o">);</span>
</span><span class='line'>
</span><span class='line'>
</span><span class='line'> <span class="cm">/** * Algorithm name </span>
</span><span class='line'><span class="cm"> * * @return The strategy name </span>
</span><span class='line'><span class="cm"> */</span>
</span><span class='line'> <span class="n">String</span> <span class="nf">getName</span><span class="o">();}</span>
</span></code></pre></td></tr></table></div></figure>
<p>这个接口的getName()只是一个唯一标识,用以标识该消费者实例是用什么负载均衡算法去分配队列。</p>
<p>关键在于<code>allocate</code>这个方法,这个方法的出参就是这次Rebalace的结果——本消费者实例应该去获取的队列列表。</p>
<p>其余四个入参分别是:</p>
<p>1.消费者组名</p>
<p>2.当前的消费者实例的唯一ID,实际上就是client 的ip@instanceName。</p>
<p>3.全局这个消费者组可以分配的队列集合</p>
<p>4.当前这个消费者组消费者集合(值是消费者实例的唯一id)</p>
<p>试想下,假设要你去做一个分配队列的算法,实际上最关键的就是两个视图:1.这个topic下全局当前在线的消费者列表,2.topic在全局下有哪些队列。</p>
<p>例如,你知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,你就能知道每个消费者应该获取两个队列。例如: c1–>q0,q1, c2–>q2,q3, c3–>q4,q5, c4–>q5,q6。</p>
<p>实际上,这就是rocketmq默认的分配方案。</p>
<p>但现在唯一的问题在于,我们刚刚说的,我们没有一个中心节点统一地做分配,所以RocketMQ需要做一定的修改。如对于C1:</p>
<p>“我是C1,我知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,我就能知道每个消费者应该获取两个队列,而我算出来我要的队列是c1–>q0,q1”。</p>
<p>同理对于C2:</p>
<p>“我是C2,我知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,我就能知道每个消费者应该获取两个队列,而我算出来我要的队列是c2–>q2,q3。</p>
<p>要做到无中心的完成这个目标,唯一需要增加的输入项就是“我是C1”,”我是C2”这样的入参,所以上文提到的<code>allocate</code>方法下面<strong>当前的消费者实例</strong>的唯一ID就是干这个事用的。以下是一个默认的策略,本人添加了中文注释,以达到的就是上文例子中的分配结果:</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<span class='line-number'>8</span>
<span class='line-number'>9</span>
<span class='line-number'>10</span>
<span class='line-number'>11</span>
<span class='line-number'>12</span>
<span class='line-number'>13</span>
<span class='line-number'>14</span>
<span class='line-number'>15</span>
<span class='line-number'>16</span>
<span class='line-number'>17</span>
<span class='line-number'>18</span>
<span class='line-number'>19</span>
<span class='line-number'>20</span>
<span class='line-number'>21</span>
<span class='line-number'>22</span>
<span class='line-number'>23</span>
<span class='line-number'>24</span>
<span class='line-number'>25</span>
<span class='line-number'>26</span>
<span class='line-number'>27</span>
<span class='line-number'>28</span>
<span class='line-number'>29</span>
<span class='line-number'>30</span>
<span class='line-number'>31</span>
<span class='line-number'>32</span>
<span class='line-number'>33</span>
<span class='line-number'>34</span>
<span class='line-number'>35</span>
<span class='line-number'>36</span>
<span class='line-number'>37</span>
</pre></td><td class='code'><pre><code class='java'><span class='line'><span class="nd">@Override</span>
</span><span class='line'><span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">></span> <span class="nf">allocate</span><span class="o">(</span><span class="n">String</span> <span class="n">consumerGroup</span><span class="o">,</span> <span class="n">String</span> <span class="n">currentCID</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">></span> <span class="n">mqAll</span><span class="o">,</span><span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">cidAll</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>
</span><span class='line'> <span class="c1">//START: 一些前置的判断</span>
</span><span class='line'> <span class="k">if</span> <span class="o">(</span><span class="n">currentCID</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">currentCID</span><span class="o">.</span><span class="na">length</span><span class="o">()</span> <span class="o"><</span> <span class="mi">1</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'> <span class="k">throw</span> <span class="k">new</span> <span class="nf">IllegalArgumentException</span><span class="o">(</span><span class="s">"currentCID is empty"</span><span class="o">);</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'> <span class="k">if</span> <span class="o">(</span><span class="n">mqAll</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">isEmpty</span><span class="o">())</span> <span class="o">{</span>
</span><span class='line'> <span class="k">throw</span> <span class="k">new</span> <span class="nf">IllegalArgumentException</span><span class="o">(</span><span class="s">"mqAll is null or mqAll empty"</span><span class="o">);</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'> <span class="k">if</span> <span class="o">(</span><span class="n">cidAll</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">isEmpty</span><span class="o">())</span> <span class="o">{</span>
</span><span class='line'> <span class="k">throw</span> <span class="k">new</span> <span class="nf">IllegalArgumentException</span><span class="o">(</span><span class="s">"cidAll is null or cidAll empty"</span><span class="o">);</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'>
</span><span class='line'> <span class="n">List</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">></span> <span class="n">result</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">MessageQueue</span><span class="o">>();</span>
</span><span class='line'> <span class="k">if</span> <span class="o">(!</span><span class="n">cidAll</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">currentCID</span><span class="o">))</span> <span class="o">{</span>
</span><span class='line'> <span class="n">log</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}"</span><span class="o">,</span>
</span><span class='line'> <span class="n">consumerGroup</span><span class="o">,</span>
</span><span class='line'> <span class="n">currentCID</span><span class="o">,</span>
</span><span class='line'> <span class="n">cidAll</span><span class="o">);</span>
</span><span class='line'> <span class="k">return</span> <span class="n">result</span><span class="o">;</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'> <span class="c1">//END: 一些前置的判断</span>
</span><span class='line'>
</span><span class='line'> <span class="c1">//核心分配逻辑开始</span>
</span><span class='line'> <span class="kt">int</span> <span class="n">index</span> <span class="o">=</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">indexOf</span><span class="o">(</span><span class="n">currentCID</span><span class="o">);</span>
</span><span class='line'> <span class="kt">int</span> <span class="n">mod</span> <span class="o">=</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">%</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">size</span><span class="o">();</span>
</span><span class='line'> <span class="kt">int</span> <span class="n">averageSize</span> <span class="o">=</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o"><=</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">?</span> <span class="mi">1</span> <span class="o">:</span> <span class="o">(</span><span class="n">mod</span> <span class="o">></span> <span class="mi">0</span> <span class="o">&&</span> <span class="n">index</span> <span class="o"><</span> <span class="n">mod</span> <span class="o">?</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">/</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span> <span class="o">:</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">/</span> <span class="n">cidAll</span><span class="o">.</span><span class="na">size</span><span class="o">());</span><span class="c1">//平均分配,每个cid分配多少队列</span>
</span><span class='line'> <span class="kt">int</span> <span class="n">startIndex</span> <span class="o">=</span> <span class="o">(</span><span class="n">mod</span> <span class="o">></span> <span class="mi">0</span> <span class="o">&&</span> <span class="n">index</span> <span class="o"><</span> <span class="n">mod</span><span class="o">)</span> <span class="o">?</span> <span class="n">index</span> <span class="o">*</span> <span class="n">averageSize</span> <span class="o">:</span> <span class="n">index</span> <span class="o">*</span> <span class="n">averageSize</span> <span class="o">+</span> <span class="n">mod</span><span class="o">;</span> <span class="c1">//从哪里开始分配,分配的位点index是什么。</span>
</span><span class='line'> <span class="kt">int</span> <span class="n">range</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">averageSize</span><span class="o">,</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">-</span> <span class="n">startIndex</span><span class="o">);</span><span class="c1">//真正分配的数量,避免除不尽的情况(实际上,有除不尽的情况)</span>
</span><span class='line'>
</span><span class='line'> <span class="c1">//开始分配本cid应该拿的队列列表</span>
</span><span class='line'> <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="n">range</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
</span><span class='line'> <span class="n">result</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">mqAll</span><span class="o">.</span><span class="na">get</span><span class="o">((</span><span class="n">startIndex</span> <span class="o">+</span> <span class="n">i</span><span class="o">)</span> <span class="o">%</span> <span class="n">mqAll</span><span class="o">.</span><span class="na">size</span><span class="o">()));</span>
</span><span class='line'> <span class="o">}</span>
</span><span class='line'> <span class="k">return</span> <span class="n">result</span><span class="o">;</span>
</span><span class='line'><span class="o">}</span>
</span></code></pre></td></tr></table></div></figure>
<h2>Rebalance是怎么对多Topic做分配</h2>
<p>细心地你可能会提一个问题,上面的提到的策略分配接口里,没有Topic的订阅关系的信息,那么如果一个消费者组订阅了topic1也订阅了topic2,topic下的队列数量可能是不一样的,那么最后分配的结果肯定也是不同的,那么怎么分配的呢?</p>
<p>答案是:一次topic的分配就单独调用一次分配接口,每次rebalance,实际上都会被RebalanceImpl里的rebalanceByTopic调用,而每订阅一个topic就会调用rebalanceByTopic,从而触发一次上文讲到的分配策略</p>
<h2>Rebalance什么时候触发</h2>
<p>其实看完上文,我们已经知道RocketMQ客户端是怎么无中心地做队列分配的了。现在还有一个问题,就是这个触发时机是什么时候?</p>
<p>为什么触发时机很重要呢?试想一下,突然间假设有一个消费者实例扩容了,从4个变成5个。如果有一个实例以5个去做负载均衡,其他四个老消费者以为在线的消费者还是只有四个,最后分配的结果肯定是会有重复的(某些情况甚至会漏分配),所以这个“节奏”很重要。</p>
<p>简单地来说,RocketMQ有三个时机会触发负载均衡:</p>
<ol>
<li><p>启动的时候,会立即触发</p></li>
<li><p>有消费实例数量的变更的时候。broker在接受到消费者的心跳包的时候如果发现这个实例是新的实例的时候,会广播一个消费者数量变更的事件给所有消费者实例;同理,当发现一个消费者实例的连接断了,也会广播这样的一个事件</p></li>
<li>定期触发(默认20秒)。</li>
</ol>
<p>第一个时机很好理解。启动的时候,消费者需要需要知道自己要分配什么队列,所以要触发Rebalance。</p>
<p>第二个时机实际也很好理解。因为有实例的数量变更,所以分配的结果肯定也需要调整的,这时候就要广播给各消费者。</p>
<p>第三点定期触发的原因实际上是一个补偿机制,为了避免第二点广播的时候因为网络异常等原因丢失了重分配的信号,或者还有别的场景实际上也需要重新计算分配结果(例如队列的数量变化、权限变化),所以需要一个定时任务做补偿。</p>
<p>从以上的触发时机可以看出,大部分情况下,消费者实例应该都是“节奏一致的”,如果出现异常场景或某些特殊场景,也会因为定时任务的补偿而达到最终一致的状态。所以如果你发现消费者分配有重复/漏分,很有可能这个消费者有短暂异常,没有及时地触发Rebalance,这个也可以从客户端日志中看出问题以便具体排查:如果一个消费者负载均衡后发现自己的分配的队列发生了变化:会有类似的日志(每一个Topic都会单独打印):</p>
<figure class='code'><figcaption><span></span></figcaption><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
</pre></td><td class='code'><pre><code class='java'><span class='line'><span class="n">rebalanced</span> <span class="n">result</span> <span class="n">changed</span><span class="o">.</span> <span class="n">allocateMessageQueueStrategyName</span><span class="o">=</span><span class="n">AVG</span><span class="o">,</span> <span class="n">group</span><span class="o">=</span><span class="n">my</span><span class="o">-</span><span class="n">consumer</span><span class="o">,</span> <span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">clientId</span><span class="o">=</span><span class="mf">10.22</span><span class="o">.</span><span class="mf">224.39</span><span class="err">@</span><span class="mi">114452</span><span class="o">,</span> <span class="n">mqAllSize</span><span class="o">=</span><span class="mi">9</span><span class="o">,</span> <span class="n">cidAllSize</span><span class="o">=</span><span class="mi">1</span><span class="o">,</span> <span class="n">rebalanceResultSize</span><span class="o">=</span><span class="mi">9</span><span class="o">,</span> <span class="n">rebalanceResultSet</span><span class="o">=[</span><span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">1</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">2</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">1</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">1</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">2</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">2</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">3</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">0</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">1</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">0</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">2</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">1</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">3</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">2</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">2</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">0</span><span class="o">],</span> <span class="n">MessageQueue</span> <span class="o">[</span><span class="n">topic</span><span class="o">=</span><span class="n">topic_event_repay</span><span class="o">,</span> <span class="n">brokerName</span><span class="o">=</span><span class="n">broker</span><span class="o">-</span><span class="mi">3</span><span class="o">,</span> <span class="n">queueId</span><span class="o">=</span><span class="mi">1</span><span class="o">]]</span>
</span></code></pre></td></tr></table></div></figure>
<p>从而判断是否及时地触发了负载均衡。</p>
<p>注:虽然每次Rebalance都会触发,但是如果重新分配后发现和原来已分配的队列是一致的,并不会有实际的重排动作。如:上次分配的是q0,q1,这次分配的也是q0,q1意味着整体的外部状态并没有修改,是不会有真正的重排动作的,这时候在日志上并不会有所表现。</p>
<h2>Rebalance可能会到来消息的重复</h2>
<p>实际上,Rebalance如果真的发现前后有变化(重排),这是一个很重的操作。因为它需要drop掉当前分配的队列以及其中的任务,还需要同步消费进度等。<strong>而由于这个过程比较长,且很可能每个消费者实际drop队列和分配队列是不一致的,所以通常情况下,重排都意味着有消息的重复投递。</strong>所以消费者端必须要做好消费的幂等。</p>
<p>我们不妨假设这样一个分配过程:A1本来拥有q0,这次重排需要拿q1,A2本来拥有q1,这次重排不需要q1了。那么对于A2来说,他首先要做的是:把q1的任务中断(drop队列),然后在合适的时机把q1的消费进度同步一下,再重新分配(这个例子这里不太重要),同样的A1也是要经历一样的过程:把q0的任务中断(drop队列),然后在合适的时机把q0的消费进度同步一下,然后重新分配——拿到q1。</p>
<p>我们假设A1的过程比A2要快,这里有两个可能:</p>
<p>1.一种情况是A1在A2把q1队列drop掉之前,A1就又拿到了q1,所以在这个时间窗口上观察,你会发现q1短暂地同时分配给了A1和A2。而由于RocketMQ的消费模型是Pull模式,所以A1、A2会同时拉取消息,消息就重复了。</p>
<p>2.另一种情况可能性更大,A2的确drop掉了队列不拉取了,但是消费进度(假设为OF1)还没及时同步到broker。那么A1拿到了q1之后,他需要第一时间知道自己从哪里(位点)拉取消息,所以他会询问一次broker,而broker这时候他的信息也是落后的,就会返回一个较老的消息位点OF2,那么[OF2,OF1]之间的消息就会重复。</p>
<p>可以看到,光负载均衡的这个实现原理,就会导致RocketMQ消息重复比一般的消息中间件概率要大,而且严重不少(消息是批量重复的)。</p>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/blog/2020/06/08/rocketmq-message-dedup/">消息幂等(去重)通用解决方案,RocketMQ</a></h1>
<p class="meta">
<time class='entry-date' datetime='2020-06-08T15:37:53+08:00'><span class='date'>2020-06-08 Mon</span> <span class='time'>15:37</span></time>
</p>
</header>
<div class="entry-content"><p>消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。</p>
<p>举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。</p>
<p>然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。</p>
<p>这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。</p>
<p>基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。</p>
<h2>简单的消息去重解决方案</h2>
<p>例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:</p>
<figure class='code'><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
</pre></td><td class='code'><pre><code class=''><span class='line'>insert into t_order values .....
</span><span class='line'>update t_inv set count = count-1 where good_id = 'good123';</span></code></pre></td></tr></table></div></figure>
<p>要实现消息的幂等,我们可能会采取这样的方案:</p>
<figure class='code'><div class="highlight"><table><tr><td class="gutter"><pre class="line-numbers"><span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>