forked from wangxingjian2015/monthly-technology
-
Notifications
You must be signed in to change notification settings - Fork 0
/
201808.txt
1060 lines (943 loc) · 97 KB
/
201808.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Spring Cloud Kubernetes provide Spring Cloud common interface implementations that consume Kubernetes native services. The main objective of the projects provided in this repository is to facilitate the integration of Spring Cloud and Spring Boot applications running inside Kubernetes.
Spring Cloud Kubernetes Features:
- Kubernetes awareness.
- DicoveryClient implementation.
- PropertySource objects configured via ConfigMaps.
- Client side loadbalancing via Netflix Ribbon.
ConfigMapPropertySource, SecretsPropertySource
YGC的时机:
- Eden空间不足时
FGC的时机
- Old空间不足时
- Perm空间不足时
- 显示调用System.gc(),包括RMI等的定时触发
- YGC时的悲观策略
- dump live内存信息时(jmap - dump:live)
- 通过设置JVM参数,定时执行FGC
YGC的悲观策略: 它触发的机制是首先会计算之前晋升的平均大小,也就是从新生代通过YGC变成老年代的平均大小,然后若干老年代剩余的空间小于晋升大小,那么就会触发一次FullGC。JVM考虑的策略是,从平均和长远的情况来看,下次晋升空间不够的可能性非常大,与其等到那个时候FGC,不然悲观的认为下次肯定会触发FGC,直接先执行一次FGC。而且从实际使用过程来看,也达到了比较稳定的效果。
---------------------------------------------
JMS Transaction management:
Spring provides a JmsTransactionManager that manages transactions for a single JMS ConnectionFactory . This allows JMS applications to leverage the managed transaction features of Spring as described in Chapter 17, Transaction Management. The JmsTransactionManager performs local resource transactions, binding a JMS Connection/Session pair from the specified ConnectionFactory to the thread. JmsTemplate automatically detects such transactional resources and operates on them accordingly.
In a Java EE environment, the ConnectionFactory will pool Connections and Sessions, so those resources are efficiently reused across transactions. In a standalone environment, using Spring’s SingleConnectionFactory will result in a shared JMS Connection , with each transaction having its own independent Session . Alternatively, consider the use of a provider-specific pooling adapter such as ActiveMQ’s PooledConnectionFactory class.
JmsTemplate can also be used with the JtaTransactionManager and an XA-capable JMS ConnectionFactory for performing distributed transactions. Note that this requires the use of a JTA transaction manager as well as a properly XA-configured ConnectionFactory! (Check your Java EE server’s / JMS provider’s documentation.)
Reusing code across a managed and unmanaged transactional environment can be confusing when using the JMS API to create a Session from a Connection . This is because the JMS API has only one factory method to create a Session and it requires values for the transaction and acknowledgement modes. In a managed environment, setting these values is the responsibility of the environment’s transactional infrastructure, so these values are ignored by the vendor’s wrapper to the JMS Connection. When using the JmsTemplate in an unmanaged environment you can specify these values through the use of the properties sessionTransacted and sessionAcknowledgeMode. When using a PlatformTransactionManager with JmsTemplate , the template will always be given a transactional JMS Session.
---------------------------------------------
当并发访问或更新数据库时,有可能出现脏读(Dirty Read)、不可重复读(Unrepeatable Read)、幻读(Phantom Read)、更新丢失(Lost update)等数据不一致情况,为了解决这些问题,MySQL引入了多种锁的概念。
MySQL InnoDB对数据行的锁定类型一共有四种:共享锁(读锁,S锁), 排他锁(写锁,X锁), 意向共享锁(IS锁)和意向排它锁(IX锁)。
MySQL支持三种锁定方式:
- 行锁:Record Lock, 锁直接加在索引记录上面;
- 间隙锁:Gap Lock, 锁加在不存在的空闲空间上,可以是两个索引记录之间,也可能是第一个索引记录之前或最后一个索引之后的空间。
- Next-Key Lock: 行锁与间隙锁组合起来用就叫做Next-Key Lock。
默认情况下,InnoDB工作在可重复读隔离级别下(Reaptable-Read),并且以Next-Key Lock的方式对数据进行加锁,这样可以有效防止幻读的发生。Next-Key Lock是行锁和间隙锁的组合,当InnoDB扫描索引记录的时候,会首先对选中的索引记录加上行锁(Record Lock),再对索引记录两边的间隙加上间隙锁(Gap Lock)。当一个间隙被事务Transaction加了锁,其他事务是不能在这个间隙插入记录的。
MySQL锁的实现:
- 在可重复读级别下,InnoDB以Next-Key Lock的方式对索引加锁;在读已提交级别下,InnoDB以Index-Record Lock的方式对索引加锁;
- 被加锁的索引如果不是聚簇索引,那被加锁的索引所指向的聚簇索引以及其他指向相同聚簇索引的索引也会被加锁。
- select * from ... lock in share mode对索引加共享锁;select * from ... for update对索引加排他锁。
- select * from ... 是非阻塞读,不会对索引加锁(除了Serializable级别)。在读已提交级别下,总是查询记录的最新、有效的版本;在可重复读级别下,会记住第一次查询时的版本,之后的查询会基于该版本。例外的情况是在串行化隔离级别下,这时会以Next-Key Lock的方式对索引加共享锁。
- update ... where 和 delete ... where对索引加排他锁。
- insert into ...以Index-Record Lock的方式对索引加排它锁。
使用select ... for update会把数据给锁住,不过需要注意一些锁的级别,MySQL InnoDB默认Row-Level Lock,所以只有明确地指定主键/索引,MySQL才会执行Row Lock(只锁住所选取的数据),否则MySQL将会执行Table Lock(将整个表给锁住)。
悲观锁的优点:
- 悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。
- 悲观锁基于DB层面实现,对业务代码无入侵,使用方便
悲观锁的缺点:
- 悲观锁适用于可靠的持续性连接,诸如C/S应用。 对于Web应用的HTTP连接,先天不适用
- 锁的使用意味着性能的损耗,在高并发、锁定持续时间长的情况下,尤其严重。Web应用的性能瓶颈多在数据库处,使用悲观锁,进一步收紧了瓶颈
- 非正常中止情况下的解锁机制,设计和实现起来很麻烦,成本还很高
- 不够严谨的设计下,可能产生莫名其妙的,不易被发现的死锁问题
- 悲观的缺陷是不论是页锁还是行锁,加锁的时间可能会很长,这样可能会长时间的限制其他用户的访问,也就是说悲观锁的并发访问性不好
线程池如何调优,最大数目如何确认:对于线程池大小的设定,需要考虑以下问题:
- CPU个数
- 内存大小
- 任务类型,是计算密集型(CPU密集型)还是IO密集型
- 是否需要一些稀缺资源,像数据库连接这种
- 其他情况
有个简单的估算方式,设N为CPU个数
- 对于CPU密集型的应用,线程池的大小设置为N + 1;
- 对于IO密集型的应用,线程池的大小设置为2N + 1;
这种设置方式适合于单台机器上的应用的类型是单一的,并且只有一个线程池,实际情况还需要根据实际情况进行验证。
在IO优化中,下面的估算公式会更合理:
最佳线程数量 = ((线程等待时间 + 线程CPU时间) / 线程CPU时间) * CPU个数
基于MQ的最终一致性分布式事务如何保证高可用性:
1. 生产者:落库(状态为待发送) + 发送MQ,发送成功更改状态为发送完成。如果发送失败,回滚数据库。如果更改状态失败,后续通过异步轮询线程查询状态为待发送的记录,重新发送,消费者需要保证幂等性;
2. MQ:保证高可用、高并发
3. 消费者:保证消费的幂等性;消费不成功时,不要提交offset。
4. 对MQ的消息堆积进行监控,查看是否有没消费的消息;同时设置死信队列,排查最终没有消费的消息,排查后将消息重新投递到原队列。
5. MQ不可用后的降级方案:1. 封装MQ客户端,直接改为同步调用;2. 封装MQ客户端,使用Redis作为MQ的替代;3. 其他类似MQ的替代品。考虑点:如果切换降级方案,比如某单位时间内失败N次后,生产者通过Zookeeper设置降级标记。消费者通过监听这个Zookeeper的标记,恰当的时机进行切换(比如尽量把之前的消息消费完毕)。同时在熔断后,在某个时间内,尝试一次发往MQ,如果成功,关闭降级开关。否则继续降级。这个可以参考Hystrix的做法。
Spring Cloud Alibaba Sentinel 1.6.0引入了Sentinel API Gateway Adapter Common模块,此模块中包含网关限流的规则和自定义API的实体和管理逻辑:
- GatewayFlowRule: 网关限流规则,针对API Gateway的场景定制的限流规则,可以针对不同route或自定义的API分组进行限流,支持对请求中的参数、Header、来源IP等进行定制化的限流。
- ApiDefinition: 用户自定义的API分组,可以看作是一些URL匹配的组合。比如可以定义一个API叫my_api,请求path模式为/foo/**和/baz/**的都归属到my_api这个API分组下面。限流的时候可以针对这个分组维度进行限流。
spring.main.allow-bean-definition-overriding=true
jcmd可以查看当前所有java进程,比jps详细一点。
jhat是jdk内置的工具之一,主要用来分析java堆,可以将堆中的对象以html的形式显示出来,包括对象的数量、大小等,并支持对象查询语言(Object Query Language)。可以使用如下步骤:
第一步:导出堆,#jmap -dump:live,file=xxx.dump <pid>
除了使用jmap还可以通过下面方式获取堆转储文件
- 使用jconsole,通过HotSpotDiagnosticMXBean从运行时获得堆转储(生成dump文件)
- JVM启动时如果指定了-XX:+HeapDumpOnOutOfMemoryError,则在抛出OutOfMemoreyError时,自动进行堆转储
- hprof
第二步:分析堆文件;jhat -J-Xmx512m xxx.dump , 如文件很大,可以增大mx参数
对于jhat启动后显示的html页面中提供的功能如下:
1. All classes (excluding platform)
2. All classed (including platform)
3. Show all members of rootset
- Java Static References
- Java Local References
- Busy Monitor References
- JNI Global References
- System Class References
4. Show instance counts for all classes(including platform)
5. Show instance counts for all classes(excluding platform)
6. Show heap histogram
7. Show finalizer summary
8. Execute Object Query Language (OQL) query
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
default void forEachRemaining(Consumer<? super T> action) {
do {} while (tryAdvance(action));
}
Spliterator<T> trySplit();
long estimateSize();
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
int characteristics();
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
default Comparator<? super T> getComparator() {
throw new IllegalStateException();
}
public static final int ORDERED = 0x00000010;
public static final int DISTINCT = 0x00000001;
public static final int SORTED = 0x00000004;
public static final int SIZED = 0x00000040;
public static final int NONNULL = 0x00000100;
public static final int IMMUTABLE = 0x00000400;
public static final int CONCURRENT = 0x00001000;
public static final int SUBSIZED = 0x00004000;
public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
extends Spliterator<T> {
@Override
T_SPLITR trySplit();
@SuppressWarnings("overloads")
boolean tryAdvance(T_CONS action);
@SuppressWarnings("overloads")
default void forEachRemaining(T_CONS action) {
do {} while (tryAdvance(action));
}
}
public interface OfInt extends OfPrimitive<Integer, IntConsumer, OfInt> {
@Override
OfInt trySplit();
@Override
boolean tryAdvance(IntConsumer action);
@Override
default void forEachRemaining(IntConsumer action) {
do {} while (tryAdvance(action));
}
@Override
default boolean tryAdvance(Consumer<? super Integer> action) {
if (action instanceof IntConsumer) {
return tryAdvance((IntConsumer)action);
} else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfInt.tryAdvance((IntConsumer) action::accept)");
return tryAdvance((IntConsumer) action::accept);
}
}
@Override
default void forEachRemaining(Consumer<? super Integer> action) {
if (action instanceof IntConsumer) {
forEachRemaining((IntConsumer)action);
} else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfInt.forEachRemaining((IntConsumer) action::accept)");
forEachRemaining((IntConsumer)action::accept);
}
}
}
public interface OfLong extends OfPrimitive<Long, LongConsumer, OfLong> {
@Override
OfLong trySplit();
@Override
boolean tryAdvance(LongConsumer action);
@Override
default void forEachRemaining(LongConsumer action) {
do { } while (tryAdvance(action));
}
@Override
default boolean tryAdvance(Consumer<? super Long> action) {
if (action instanceof LongConsumer) {
return tryAdvance((LongConsumer) action);
}
else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfLong.tryAdvance((LongConsumer) action::accept)");
return tryAdvance((LongConsumer) action::accept);
}
}
@Override
default void forEachRemaining(Consumer<? super Long> action) {
if (action instanceof LongConsumer) {
forEachRemaining((LongConsumer) action);
}
else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfLong.forEachRemaining((LongConsumer) action::accept)");
forEachRemaining((LongConsumer) action::accept);
}
}
}
/**
* A Spliterator specialized for {@code double} values.
* @since 1.8
*/
public interface OfDouble extends OfPrimitive<Double, DoubleConsumer, OfDouble> {
@Override
OfDouble trySplit();
@Override
boolean tryAdvance(DoubleConsumer action);
@Override
default void forEachRemaining(DoubleConsumer action) {
do { } while (tryAdvance(action));
}
/**
* {@inheritDoc}
* @implSpec
* If the action is an instance of {@code DoubleConsumer} then it is
* cast to {@code DoubleConsumer} and passed to
* {@link #tryAdvance(java.util.function.DoubleConsumer)}; otherwise
* the action is adapted to an instance of {@code DoubleConsumer}, by
* boxing the argument of {@code DoubleConsumer}, and then passed to
* {@link #tryAdvance(java.util.function.DoubleConsumer)}.
*/
@Override
default boolean tryAdvance(Consumer<? super Double> action) {
if (action instanceof DoubleConsumer) {
return tryAdvance((DoubleConsumer) action);
}
else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfDouble.tryAdvance((DoubleConsumer) action::accept)");
return tryAdvance((DoubleConsumer) action::accept);
}
}
/**
* {@inheritDoc}
* @implSpec
* If the action is an instance of {@code DoubleConsumer} then it is
* cast to {@code DoubleConsumer} and passed to
* {@link #forEachRemaining(java.util.function.DoubleConsumer)};
* otherwise the action is adapted to an instance of
* {@code DoubleConsumer}, by boxing the argument of
* {@code DoubleConsumer}, and then passed to
* {@link #forEachRemaining(java.util.function.DoubleConsumer)}.
*/
@Override
default void forEachRemaining(Consumer<? super Double> action) {
if (action instanceof DoubleConsumer) {
forEachRemaining((DoubleConsumer) action);
}
else {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} calling Spliterator.OfDouble.forEachRemaining((DoubleConsumer) action::accept)");
forEachRemaining((DoubleConsumer) action::accept);
}
}
}
}
当对表设计、SQL优化、索引优化、读写分离、缓存后,数据库的压力还是很大,这时就需要进行数据库拆分了。数据库拆分就是指通过某种特定的条件,按照某个维度,将存放在同一个数据库中的数据分散存放到多个数据库(主机)上,以达到分散单库(单机)负载的效果。分为:垂直拆分和水平拆分两种模式。
垂直拆分:专库专用,一个数据库由很多表构成,每个表对应着不同的业务,垂直拆分是指按照业务将表进行分类,分布到不同的数据库上,这样也就将数据或者说压力分担到不同的库。
优点:
1. 拆分后业务清晰,拆分规则明确。
2. 系统之间整合或扩展容易。
3. 数据维护简单。
缺点:
1. 部分业务表无法join,只能通过接口方式解决,提高了系统复杂度。
2. 受热点业务的限制存在单库性能瓶颈,不容易数据扩展和性能提高。
3. 事务处理复杂。
水平拆分:垂直拆分后遇到单库瓶颈,可以使用水平拆分。相对于垂直拆分的区别是:垂直拆分是把不同的表拆到不同的数据库里,而水平拆分是把同一个表拆分到不同的库。相对于垂直拆分,水平拆分不是将表的数据做分类,而是按照某个字段的某种规则来分散到多个库中,每个表中包含一部分数据。简单来说,水平拆分是按照数据行的切分,就是将表中的某些行切分到一个库,而另外的某些行切分到其他的库,主要有分库、分表两种模式。
优点:
1. 不存在单表大数据,高并发的性能瓶颈。
2. 对应用透明,应用端改造较少。
3. 按照合理拆分规则拆分,join操作基本避免跨库。
4. 提高了系统的稳定性和负载能力。
缺点:
1. 拆分规则难以抽象。
2. 事务一致性难以解决。
3. 数据多次扩展难度大,维护量大。
4. 跨库join性能差。
水平拆分和垂直拆分的共同缺点:
1. 引入了分布式事务的问题。
2. 跨节点join的问题。
3. 跨节点合并、排序、分页问题。
拆分原则:
1. 尽量不拆分,架构是进化而来,不是一蹴而就。(SOA)
2. 最大可能找到最佳的拆分维度。
3. 由于数据库中间件对数据join实现的优劣难以把握,而且难度较高,业务读取尽量少用join,尽量通过数据冗余、分组避免数据跨库多表join。
4. 尽量避免分布式事务。
5. 单表拆分到数据几百万以内。
JDK8中的HashMap是先插入后扩容,原因如下:
在final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict)函数中有代码片段:
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
源码很清楚的表达了扩容原因:调用put不一定是新增数据,还可能覆盖原来的数据,这里就存在一个key的比较问题。以先扩容为例,需要先判断是否是新增的数据,再判断新增数据后是否扩容,这样判断会比较浪费时间。而后扩容,判断不是新增数据,直接返回即可,不执行扩容,这样可以提高效率。
Kafka的缺点以及如何优化:
1. 不支持事务。不支持JMS
2. 只支持同一Partition内的消息有序,不支持消息全局有序。
3. 不支持Exactly Once语义。只支持At least once。
4. 复杂性。需要ZK支持管理元数据。
5. 由于是批量发送,数据并非真正实时。
6. 不支持延迟功能。
7. 监控不完善,需要安装插件。
8. 不支持MQTT协议
2PC的优缺点:
- 优点:原理简单,实现方便
- 缺点:同步阻塞、单点问题、脑裂(主从数据不一致)、保守(协调者超时机制判断是否要中断)
3PC(事务询问-CanCommit, 执行事务预提交-PreCommit, 执行提交-DoCommit, 回滚-Rollback)的优缺点:
- 优点:降低了参与者的阻塞范围,能够在单点故障后继续达成一致
- 缺点:在参与者接收到预提交消息后,如果网络出现分区,此时协调者所在节点和参与者无法进行网络通信,在这种情况下,该参与者依然会进行事务提交,这必然出现数据的不一致性。
Is Zab just a special implementation of Paxos?
No, Zab is a different protocol than Paxos, although it shares with it some key aspects, as for example:
- A leader proposes values to the followers
- Leaders wait for acknowledgements from a quorum of followers before considering a proposal committed(learned)
- Proposals include epoch numbers, which are similar to ballot numbers in Paxos
The main conceptual difference between Zab and Paxos is that it is primarily designed for primary-backup systems, like Zookeeper, rather than for state machine replication.
What is the difference between primary-backup and state machine replication?
A state machine is a software component that processes a sequence of requests. For every processed request, it can modify its internal state and produce a reply. A state machine is deterministic in the sense that, given two runs where it receives the same sequence of requests, it always makes the same internal state transitions and produces the same replies.
A state machine replication system is a client-sever system ensuring that each state machine replica executes the same sequence of client requests, even if these requests are submitted concurrently by clients and received in different orders by the replicas. Replicas agree on the execution order of client requests using a consensus algorithm like Paxos. Client requests that are sent concurrently and overlap in time can be executed in any order. If a leader fails, a new leader that executes recovery is free to arbitrarily reorder any uncommitted request since it is not yet completed.
In the case of primary-backup systems, such as Zookeeper, replicas agree on the application order of incremental (delta) state updates, which are generated by a primary replica and sent to its followers. Unlike client requests, state updates must be applied in the exact original generation order of the primary, starting from the original initial state of the primary. If a primary fails, a new primary that executes recovery cannot arbitrarily reorder uncommitted state updates, or apply them starting from a different initial state.
In conclusion, agreement on state updates (for primary-backup systems) requires stricter ordering guarantees than agreement on client requests (for state machine replication systems).
What are the implications for agreement algorithms?
Paxos can be used for primary-backup replication by letting the primary be the leader. The problem with Paxos is that, if a primary concurrently proposes multiple state updates and fails, the new primary may apply uncommitted updates in an incorrect order. An example is presented in our DSN 2011 paper (Figure 1). In the example, a replica should only apply the state update B after applying A. The example shows that, using Paxos, a new primary and its follows may apply B after C, reaching an incorrect state that has not been reached by any of the previous primaries.
A workaround to this problem using Paxos is to sequentially agree on state updates: a primary proposes a state update only after it commits all previous state updates. Since there is at most one uncommitted update at a time, a new primary cannot incorrectly reorder updates. This approach, however, results in poor performance.
Zab does not need this workaround. Zab replicas can concurrently agree on the order of multiple state updates without harming correctness. This is achieved by adding one more synchronization phase during recovery compared to Paxos, and by using a different numbering of instances based on zxids.
ZK的应用:
1. 命名服务
2. 负载均衡
3. 分布式锁(排他锁和共享锁)
4. 配置管理
5. Leader选举
6. 分布式队列
7. 服务注册/发现
ZK的设计目的:
1. 最终一致性:client无论连接到哪个server,展示的都是同一个视图,这是ZK最重要的性能;
2. 可靠性:具有简单、健壮、良好的性能,如果消息被接受,那么它将被所有的server接受;
3. 实时性:ZK保证客户端将在一个时间间隔内获得服务器的更新信息,或者服务器失效的信息。但由于网络延迟原因,ZK不能保证两个客户端能同时得到刚刚更新的数据,如果需要最新数据,应该在读数据前调用sync()接口。
4. 等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。
5. 原子性:更新只能成功或失败,没有中间状态。
6. 顺序性:包括全局有序和偏序两种。全局有序是指如果在一台服务器上消息A在消息B之前发布,则在所有Server上消息A都将在消息B之前被发布;偏序是指如果一个消息B在消息A后被同一个发送者发布,A必将排在B前面。
ZK的Follower主要有4个功能:
1. 向Leader发送请求(Ping消息、Request消息、Ack消息、Revalidate消息)
2. 接收Leader消息并进行处理
3. 接收Client消息,如果为写请求,发送给Leader处理
4. 返回Client结果
Follower的消息循环处理以下来自Leader的消息:
1. Ping消息:心跳消息
2. Proposal消息:Leader发起的提案,要求Follower投票
3. Commit消息:服务端最新一次提案的信息
4. Uptodate消息:表明同步完成
5. Revalidate消息:根据Leader的Revalidate结果,关闭待revalidate的session还是允许其接受消息
6. Sync消息:返回Sync结果给客户端,这个消息最初由客户端发起,用来强制得到最新的更新
ZK的Observer: Zookeeper需保证高可用和强一致性;为了支持更多的客户端,需要增加更多Server,Server增多,投票延迟增大,影响性能;权衡伸缩性和高吞吐率,引入Observer;Observer不参与投票;Observer接受客户端的连接,并将写请求转发给Leader节点;加入更多Observer节点,提高伸缩性,同时不影响吞吐率。
ZK的Server数目一般为奇数(3、5、7等),如果有3个,则最多允许1个挂掉;如果有4个,则同样最多允许1个挂掉;由此,3个和4个Server的容灾能力是一样的,所以为了节省服务器资源,一般采用奇数个数,作为服务器部署个数。
Zookeeper的临时节点不能创建子节点。
你也可以想象 Zookeeper 维护了两条监视链表:数据监视和子节点监视(data watches and child watches) getData() and exists() 设置数据监视,getChildren() 设置子节点监视。或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode节点的相关信息,而 getChildren() 返回子节点列表。因此, setData()会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create()操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete()操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。
Zookeeper中的监视是轻量级的,因此容易设置、维护和分发。当客户端与 Zookeeper服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过 exists() 设置了某个 znode 节点的监视,但是如果某个客户端在此znode 节点被创建和删除的时间间隔内与 zookeeper 服务器失去了联系,该客户端即使稍后重新连接zookeeper服务器后也得不到事件通知。
序列化协议:
1. Thrift
2. 自定义协议(TLV协议, 8583协议)
3. Avro
4. FastJson
5. FST
6. Gson
7. Hessian2
8. java原生
9. Kryo
10. Protobuf
11. Xml
12. Jboss marshaling
13. MessagePack
选择标准
1. 是否跨语言;
2. 性能:序列化性能和反序列化性能;
3. 和现有框架的集成度(Spring, RPC);
4. 可读性(可调试性);
5. 是否具备兼容性(对于数据字段的增删)、扩展性;
6. 序列化后的空间、网络占用带宽(是否支持压缩编码);
7. 是否支持丰富的数据类型;
FST是完全兼容JDK序列化协议的序列化框架,序列化速度大概是JDK的4-10倍,大小是JDK大小的1/3左右。
不同的场景适用的序列化协议:
- 对于公司间的系统调用,如果性能要求在100ms以上的服务,基于XML的SOAP协议是一个值得考虑的方案。
- 基于Web browser的Ajax,以及Mobile app与服务端之间的通讯,JSON协议是首选。对于性能要求不太高,或者以动态类型语言为主,或者传输数据载荷很小的的运用场景,JSON也是非常不错的选择。
- 对于调试环境比较恶劣的场景,采用JSON或XML能够极大的提高调试效率,降低系统开发成本。
- 当对性能和简洁性有极高要求的场景,Protobuf,Thrift,Avro之间具有一定的竞争关系。
- 对于T级别的数据的持久化应用场景,Protobuf和Avro是首要选择。如果持久化后的数据存储在Hadoop子项目里,Avro会是更好的选择。
- 由于Avro的设计理念偏向于动态类型语言,对于动态语言为主的应用场景,Avro是更好的选择。
- 对于持久层非Hadoop项目,以静态类型语言为主的应用场景,Protobuf会更符合静态类型语言工程师的开发习惯。
- 如果需要提供一个完整的RPC解决方案,Thrift是一个好的选择。
- 如果序列化之后需要支持不同的传输层协议,或者需要跨防火墙访问的高性能场景,Protobuf可以优先考虑。
如何在发布时,新服务起来之后,不把请求发往老服务?
1. 对于有LB的服务,在LB端进行流量分发;
2. 对于软负载Ribbon,手工设置老服务下线。
相关问题:
1. 走eureka的滚动发布/蓝绿发布/灰度发布,也就是客户端负载均衡的情况
2. 不走eureka的滚动发布/蓝绿发布/灰度发布,也就是服务器端负载均衡的情况
3. 如何定义一个发布单元:将需要打包发布的服务打包为一个发布单元,进行整体分布。定义依赖关系,自动检测循环依赖关系。
4. 滚动发布/蓝绿发布/灰度发布如何控制除了RPC之外的调用,比如Kafka消息通讯。也就是非Client/Server的负载均衡情况。
5. 如何解决新老服务之间同时运行期间造成的不一致。
6. 新服务不可用时,如何正确回滚到老服务,包括代码、数据库、MQ等。
7. 如何在引入流量期间能够快速全面的发现问题:包括更细粒度的业务健康度监控、DB层面的监控等;另外,线上自动化测试验证,也能帮助我们在引流阶段发现由于线上线下环境差异而带来的问题。
8. 大规模自动化应用部署能力:全应用并行发布,颠覆以往顺序发布的模式,对发布平台的资源分发等方面的性能也提出了较大挑战。
9. 如何提供更细粒度的流量调度能力。
Instrumentation:
利用Java代码,即java.lang.instrument做动态Instrumentation是Java SE 5的新特性,它把Java的instrument功能从本地代码中解放出来,使之可以用Java代码的方式解决问题。使用Instrumentation,开发者可以构建一个独立于应用程序的代理程序(Agent),用来监测和协助运行在JVM上的程序,甚至能够替换和修改某些类的定义。有了这样的功能,开发者就可以实现更为灵活的运行时虚拟机监控和Java类操作了,这样的特性实际上提供了一种虚拟机级别支持的AOP实现方式,使得开发者无需对JDK做任何升级和改动,就可以实现某些AOP的功能了。
在Java SE6中,instrumentation包被赋予了更强大的功能;启动后的instrument、本地代码(native code)、以及动态改变classpath等等。这些改变,意味着具有了更强的动态控制、解释能力,它使得Java语言变得更加灵活多变。
java.lang.instrument包的具体实现,依赖于JVMTI。JVMTI(Java Virtual Machine Tool Interface)是一套由Java虚拟机提供的,为JVM相关的工具提供的本地编程接口集合。JVMTI是从Java SE5开始引入,整合和取代了以前使用的Java Virtual Machine Profiler Interface(JVMPI)和Java Virtual Machine Debug Inteface(JVMDI),而在Java SE6中,JVMPI和JVMDI已经消失了。JVMTI提供了一套“代理”程序机制,可以支持第三方工具程序以代理的方式连接和访问JVM,并利用JVMTI提供的丰富的编程接口,完成很多跟JVM相关的功能。
Java SE6新特性,获取Instrumentation接口的实例有两种方法:
1. 当JVM以指示一个代理类的方式启动时,将传递给代理类的premain方法一个Instrumentation实例。
2. 当JVM提供某种机制在JVM启动之后某一时刻启动代理时,将传递给代理代码的agentmain方法一个Instrumentation实例。
Kafka怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)
Lag即consumer offset lag。可以通过下列三种方式进行定制监控(当然可以使用其他现有工具):
1. 利用官方类库:ConsumerOffsetChecker,适用于0.8.2.2版本,kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala; 缺点是该类是给命令行调用的,每调用一次,就new一次zkClient,对于监控来说,不太适合,需要改造一下,将zkClient抽取出来。
2. 利用官方库:ConsumerGroupCommand,0.8.2.2以上版本使用ConsumerGroupCommand替代了ConsumerOffsetChecker,kafka_2.11-0.10.2.1-sources.jar!/kafka/admin/ConsumerGroupCommand.scala
3. 直接利用kafka本身写入JMX的数据。
对于Kafka的LEO和HW而言,这两个都是指最后一条的下一条的位置而不是指最后一条的位置。
LSO是指LastStableOffset,不是LogStartOffset,它与具体的kafka事务有关。
Kafka的消费端isolation.level,这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为:read_uncommitted和read_committed,表示消费者所消费到的位置,如果设置为read_committed,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为:read_uncommitted,即可以消费到HW(High Watermark)处的位置。注意: follower副本的事务隔离级别也为:read_uncommitted,并且不可修改。在开启Kafka事务时,生产者发送了若干消息(比如msg1、msg2、msg3)到broker中,如果生产者没有提交事务(执行commitTransaction),那么对于isolation.level = read_committed的消费者而言是看不到这些消息的,而isolation.level = read_uncommitted则可以看到。事务中的第一条消息的位置可以标记为firstUnstableOffset(也就是msg1的位置)。这个LSO还会影响Kafka消费滞后量(即Kafka Lag,也称为消息堆积量)的计算。
对每个分区而言,Lag等于HW-ConsumerOffset的值,其中ConsumerOffset表示当前的消费位移。当然这只是针对普通的情况。如果消息引入了事务,那么Lag的计算方式就会有所不同。
如果消费者客户端的isolation.level参数设置为read_uncommitted(默认情况),那么Lag的计算方式不受影响;如果这个参数配置为read_committed,那么就要引入LSO来进行计算了。对于未完成的事务而言,LSO的值等于事务中第一条消息的位置(firstUnstableOffset),对于已经完成的事务而言,它的值同HW相同,因此结论:LSO<=HW<=LEO。所以,对于分区中有未完成的事务,并且消费者客户端的isolation.level配置为read_committed的情况,它对应的Lag等于LSO-ConsumerOffset的值。
考虑情形:需要将静态内容(类似图片、文件)展示给用户。那么这个情形就意味着你需要先将静态内容从磁盘中拷贝出来放到一个内存buf中,然后将这个buf通过socket传输给用户,进而用户或者静态内容的展示。
在这个过程中,文件经历了4次Copy的过程:
1. 首先调用read时,文件拷贝到内核空间;
2. 之后CPU控制将内核空间的数据拷贝到用户空间;
3. 调用write时,先将用户空间下的数据拷贝到内核空间下的socket的buffer中;
4. 最后将内核空间下的socket buffer的数据copy到网卡设备中传送。
从上面的过程可以看到,数据白白从内核空间到用户空间走了一圈,浪费了2次copy(第一次,从内核空间拷贝到用户空间;第二次从用户空间再拷贝到内核空间,即上面的过程2和过程2).而且上面的过程中,内核空间和用户空间的上下文切换的次数是4次。
可用一种Zero-Copy的技术来节省无所谓的拷贝。应用程序可以用Zero-Copy来请求内核直接把磁盘的数据传输给socket,而不是经过应用程序传输。Zero-Copy大大提高了应用程序的性能,并且减少了内核和用户空间的上下文切换。
Zero-Copy技术省去了将操作系统的read buffer拷贝到程序的buffer,以及从程序buffer拷贝到socket buffer的步骤,直接将read buffer拷贝到socket buffer。Java NIO中的FileChannel.transferTo()方法就是这样实现的,底层依赖操作系统的sendFile()。
使用了Zero-Copy技术之后,整个过程如下:
1. transferTo()方法使得文件的内容直接拷贝到read buffer(内核空间)中;
2. 然后数据拷贝到socket buffer中;
3. 最后将socket buffer中的数据拷贝到网卡设备中传输。
这显然是比较的进步:这里将上下文切换次数从4次减少到2次,同时也把数据拷贝的次数从4次降低到3次。
但这并不是Zero-Copy。在Linux2.1内核引入了sendfile(socket, file, len)函数,该函数通过一次系统调用完成了文件的传送,减少了原来的read/write的上下文切换。通过sendfile传输文件只需要一次系统调用,如下:
1. 首先(通过DMA)将数据从磁盘读取到内核buffer中;
2. 然后将内核buffer拷贝到socket buffer;
3. 最后将socket buffer中的数据拷贝到网卡设备发送。
sendfiel与read/write模式相比,少了一次copy。但是从上述过程中也可以发现从kernel buffer中将数据copy到socket buffer是没有必要的。
Linux2.4内核对sendfile()做了改进。改进后的处理过程如下:
1. 将文件拷贝到内核buffer;
2. 向socket buffer中追加当前要发生的数据在内核buffer中的位置和偏移量;
3. 根据socket buffer中的位置和偏移量直接将内核buffer中的数据拷贝到网卡设备。
经过上述过程,数据只经过了2次拷贝就从磁盘传输出去了。这才是真正的Zero-Copy(这里的零拷贝是针对内核来说,数据在内核模式下是Zero-Copy)。正是Linux2.4的内核做了改进,Java中的TransferTo()实现了Zero-Copy。
Kafka的AR、ISR、OSR、LEO、HW这些元信息都保存在Zookeeper中。
最开始所有的副本都在ISR中,在Kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被剔除ISR存入OSR,如果后续速度恢复可以重新加入ISR中。
HW的截断机制:如果leader宕机,选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
Redis支持三种集群方式:主从复制、哨兵模式和集群模式。
主从复制原理:
- 从服务器连接主服务器,发送SYNC命令;
- 主服务器接收到SYNC命名后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
- 主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令;
- 从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
- 主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
- 从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令;(从服务器初始化完成)
- 主服务器每执行一个写命令就会向Slave发送相同的写命令,从服务器接收并执行收到的写命令(从服务器初始化完成后的操作)
主从复制的优点:
- 支持主从复制,主机会自动将数据同步到从机,可以进行读写分离
- 为了分载Master的读操作压力,Slave服务器可以为客户端提供只读操作的服务,写服务仍然必须由Master来完成
- Slave同样可以接受其它Slaves的连接和同步请求,这样可以有效的分载Master的同步压力。
- Master Server是以非阻塞的方式为Slaves提供服务。所以在Master-Slave同步期间,客户端仍然可以提交查询或修改请求。
- Slave Server同样是以非阻塞的方式完成数据同步。在同步期间,如果有客户端提交查询请求,Redis则返回同步之前的数据
哨兵模式:当主服务器中断服务后,可以将一个从服务器升级为主服务器,以便继续提供服务,但是这个过程需要人工手动来操作。 为此,Redis 2.8中提供了哨兵工具来实现自动化的系统监控和故障恢复功能。
哨兵的作用就是监控Redis系统的运行状况。它的功能包括以下两个。
(1)监控主服务器和从服务器是否正常运行。
(2)主服务器出现故障时自动将从服务器转换为主服务器。
哨兵的工作方式:
- 每个Sentinel(哨兵)进程以每秒钟一次的频率向整个集群中的Master主服务器,Slave从服务器以及其他Sentinel(哨兵)进程发送一个 PING 命令。
- 如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被 Sentinel(哨兵)进程标记为主观下线(SDOWN)
- 如果一个Master主服务器被标记为主观下线(SDOWN),则正在监视这个Master主服务器的所有 Sentinel(哨兵)进程要以每秒一次的频率确认Master主服务器的确进入了主观下线状态
- 当有足够数量的 Sentinel(哨兵)进程(大于等于配置文件指定的值)在指定的时间范围内确认Master主服务器进入了主观下线状态(SDOWN), 则Master主服务器会被标记为客观下线(ODOWN)
- 在一般情况下, 每个 Sentinel(哨兵)进程会以每 10 秒一次的频率向集群中的所有Master主服务器、Slave从服务器发送 INFO 命令。
- 当Master主服务器被 Sentinel(哨兵)进程标记为客观下线(ODOWN)时,Sentinel(哨兵)进程向下线的 Master主服务器的所有 Slave从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
- 若没有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线, Master主服务器的客观下线状态就会被移除。若 Master主服务器重新向 Sentinel(哨兵)进程发送 PING 命令返回有效回复,Master主服务器的主观下线状态就会被移除。
哨兵模式的优点:
- 哨兵模式是基于主从模式的,所有主从的优点,哨兵模式都具有。
- 主从可以自动切换,系统更健壮,可用性更高。
哨兵模式的缺点:
- Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
Redis Cluster集群:
redis的哨兵模式基本已经可以实现高可用,读写分离 ,但是在这种模式下每台redis服务器都存储相同的数据,很浪费内存,所以在redis3.0上加入了cluster模式,实现的redis的分布式存储,也就是说每台redis节点上存储不同的内容。
Redis-Cluster采用无中心结构,它的特点如下:
- 所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽。
- 节点的fail是通过集群中超过半数的节点检测失效时才生效。
- 客户端与redis节点直连,不需要中间代理层.客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。
Redis Cluster的工作方式:
在redis的每一个节点上,都有这么两个东西,一个是插槽(slot),它的的取值范围是:0-16383。还有一个就是cluster,可以理解为是一个集群管理的插件。当我们的存取的key到达的时候,redis会根据crc16的算法得出一个结果,然后把结果对 16384 求余数,这样每个 key 都会对应一个编号在 0-16383 之间的哈希槽,通过这个值,去找到对应的插槽所对应的节点,然后直接自动跳转到这个对应的节点上进行存取操作。
为了保证高可用,redis-cluster集群引入了主从模式,一个主节点对应一个或者多个从节点,当主节点宕机的时候,就会启用从节点。当其它主节点ping一个主节点A时,如果半数以上的主节点与A通信超时,那么认为主节点A宕机了。如果主节点A和它的从节点A1都宕机了,那么该集群就无法再提供服务了。
/* Defines a factory which can return an Object instance(possibly shared or independent) when invoked.
This interface is typically used to encapsulate a generic factory which returns a new instance (prototype) of some target object on each invocation.
This interface is similar to FactoryBean, but implementations of the latter are normally meant to be defined as SPI instance in a BeanFactory, while implementations of this class are normally meant to be fed as an API to other beans(through injection). As such, the getObject() method has different exception handling behavior.
*/
public interface ObjectFactory<T> {
T getObject() throws BeansException;
}
/* A variant of ObjectFactory designed specifically for injection points, allowing for programmatic optionality and lenient not-unique handling.
*/
public interface ObjectProvider<T> extends ObjectFactory<T> {
T getObject(Object... args) throws BeansException;
T getIfAvailable() throws BeansExcepiont;
T getIfUnique() throws BeansException;
}
Kafka的事务场景:
1. Producer发多条消息组成一个事务这些消息对Consumer同时可见或者不可见;
2. Producer可能会给多个topic、多个Partition发消息,这些消息也需要放在一个事务里面,这就形成了一个典型的分布式事务;
3. 具体应用场景先消费一个topic,然后做处理,再发送另一个topic,这种consume-transform-produce过程需要放在一个事务里面,比如在消息处理或者发送过程中失败了,消费位点也不能提交;
4. Producer或者Producer所在的应用可能会挂掉,新的Producer启动后需要知道怎么处理之前未完成的事务;
5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致响应时间非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别。
Kafka使用事务的两种方式:
1. 配置Kafka事务管理器并使用@Transactional注解
2. 使用KafkaTemplate的executeInTransaction方法
使用@Transactional注解方式实现Kafka事务:
首先需要配置KafkaTransactionManager, 需要使用生产者工厂来创建这个事务管理类。注意:需要在producerFactory中开启事务功能,并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。
@Bean
public ProducerFactory<Integer, String> producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());
factory.transactionCapable();
factory.setTransactionIdPrefix("tran-");
return factory;
}
@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
@Test
@Transactional
public void testTransactionalAnnotation() throws InterruptedException {
kafkaTemplate.send("topic.quick.tran", "test transactional annotation");
throw new RuntimeException("fail");
}
使用KafkaTemplate.executeInTransaction开启事务:这种方式开启事务是不需要配置事务管理器的,也称为本地事务
@Test
public void testExecuteInTransaction() throws InterruptedException {
kafkaTemplate.executeInTransaction(new KafakOperations.OperationsCallback() {
@Override
public Object doInOperations(KafakOperations kafkaOperations) {
kafkaOperations.send("topic.quick.tran", "test executeInTransaction");
throw new RuntimeException("fail");
}
});
}
为了支持事务,Kafka 0.11.0版本引入了以下概念:
1. 事务协调者:类似于消费者负载均衡的协调者,每一个实现事务的Producer都被分配到一个事务协调者(Transaction Coordinator);
2. 引入一个内部Kafka Topic作为事务Log:类似于消费者管理Offset的Topic,事务Topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入;
3. 引入控制消息(Control Messages): 这些消息是客户端产生的并写入到主题的特殊消息,但对于消费者来说是不可见。它们用来让broker告知消费者之前拉取的消息是否被原子性提交;
4. 引入TransactionId: 不同生产者实例使用同一个TransactionId表示是同一个事务,可以跨Session的数据幂等性发送。当具有相同TransactionId的新的Producer实例被创建且工作时,旧的拥有相同TransactionId的Producer将不再工作,避免事务僵死;
5. Producer ID: 每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。主要为提供幂等性时引入的;
6. Sequence Number:对每个PID,该Producer发送数据的每个都对应一个从0开始单调递增的Sequence Number;
7. 每个生产者增加epoch:用于标识同一个事务ID在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求;
8. 幂等性:保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性开关enable.idempotence,可以独立于事务使用,即可以只开启幂等性但不开启事务。
分库分表后,如何解决全表查询的问题,几种思路:
1. 设置全局表/广播表(小表广播);适合于数据量比较小,很少发生修改的表,在每个库中保存一份。需要注意一致性问题。
2. 字段冗余;反范式设计。比较适合依赖的字段比较少的情况。需要考虑一致性问题:比如依赖表的字段做了修改,可通过触发器或者业务层面进行同步修改。或者binlog订阅修改。
3. 数据表的同步;定时将A库中的表X同步到B库,这样可以在B库进行和表X相关的处理;
4. 系统层面处理组装。可考虑多线程处理,节省时间开销。
5. 订阅binlog,将相关聚合操作,在订阅binlog的服务进行处理,进行缓存或者持久化数据库。也就是流式计算。
6. 可以在业务处理的同时,将记录发送的MQ,在MQ消费端进行处理,提前计算好聚合操作的结果。
7. 如果聚合操作/全表查询的总记录数比较少,可直接缓存在NOSQL中。
8. 使用搜索引擎。比如elasticsearch。
jstat -gcutil <pid> 1000
在主从模式的Redis集群中,如果Master宕机,假设主从都没有做数据持久化,此时千万不要立马重启主服务,否则会造成数据丢失,正确的操作如下:
1. 在slave上执行slaveof on one, 断开主从关系,并把slave升级为主库;
2. 此时重启master,执行slaveof,把它设置为从库,连接到主库,做主从复制,自动备份数据。
为什么Redis-Cluster使用16384个槽(slots)?
The reason is:
1. Normal heartbeat packets carry the full configuration of a node, that can be replaced in an idempotent way with the old in order to update an old config. This means they contain the slots configuration for a node, in raw form, that uses 2k of space with 16k slots, but would use a prohibitive 8k of space using 65k slots.
2. At the same time it is unlikely that Redis Cluster would scale to more than 1000 mater nodes because of other design tradeoffs.
So 16k was in the right range to ensure enough slots per master with a max of 1000 maters, but a small enough number to propagate the slot configuration as a raw bitmap easily. Note that in small clusters the bitmap would be hard to compress because when N is small the bitmap would hava slots/N bits set that is a large percentage of bits set.
JDK7中HashMap在并发扩容时,同时执行transfer方法,如果原始链表相邻的两个元素,扩容后仍是相邻的两个元素,由于采用了头插入,会造成两个元素形成互为首尾,形成死循环。
mysql> show processlist;
如何检测重复的jar包?
1. 如果是maven工程,可用mvn dependency:tree
2. 代码检测。在各种版本的jar包中挑选一个一定会加载的类,加上重复类检查,示例如下:
static {
Duplicate.checkDuplicate(xxx.class);
}
public final class Duplicate {
private Duplicate() {}
public static void checkDuplicate(Class clz) {
checkDuplicate(clz.getName().replace('.', '/') + ".class");
}
public static void checkDuplicate(String path) {
try {
//在ClassPath中搜索文件
Enumeration<URL> urls = Thread.currentThread().getContextClassLoader().getResources(path);
Set<String> files = new HashSet<>();
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
if (url != null) {
String file = url.getFile();
if (file != null && file.length() > 0) {
files.add(file);
}
}
}
//如果有多个,就表示重复
if (files.size() > 1) {
log.error("Duplicate class " + path + " in " + files.size() + " jar " + files);
}
} catch (Throwable t) { //防御性容错
log.error("Error checkDuplicate for:" + path, t);
}
}
}
kill之前先dump。每次线上环境一出问题,通常最直接的办法回滚重启,以减少故障时间,这样现场就被破坏了,要想事后查问题就麻烦了,有些问题必须在线上的大压力下才会发生,线下测试环境很难重现,不太可能让开发或 Appops 在重启前,先手工将出错现场所有数据备份一下,所以最好在 kill 脚本之前调用 dump,进行自动备份,这样就不会有人为疏忽。dump脚本示例:
JAVA_HOME=/usr/java
OUTPUT_HOME=~/output
DEPLOY_HOME=`dirname $0`
HOST_NAME=`hostname`
DUMP_PIDS=`ps --no-heading -C java -f --width 1000 | grep "$DEPLOY_HOME" |awk '{print $2}'`
if [ -z "$DUMP_PIDS" ]; then
echo "The server $HOST_NAME is not started!"
exit 1;
fi
DUMP_ROOT=$OUTPUT_HOME/dump
if [ ! -d $DUMP_ROOT ]; then
mkdir $DUMP_ROOT
fi
DUMP_DATE=`date +%Y%m%d%H%M%S`
DUMP_DIR=$DUMP_ROOT/dump-$DUMP_DATE
if [ ! -d $DUMP_DIR ]; then
mkdir $DUMP_DIR
fi
echo -e "Dumping the server $HOST_NAME ...\c"
for PID in $DUMP_PIDS ; do
$JAVA_HOME/bin/jstack $PID > $DUMP_DIR/jstack-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jinfo $PID > $DUMP_DIR/jinfo-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jstat -gcutil $PID > $DUMP_DIR/jstat-gcutil-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jstat -gccapacity $PID > $DUMP_DIR/jstat-gccapacity-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jmap $PID > $DUMP_DIR/jmap-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jmap -heap $PID > $DUMP_DIR/jmap-heap-$PID.dump 2>&1
echo -e ".\c"
$JAVA_HOME/bin/jmap -histo $PID > $DUMP_DIR/jmap-histo-$PID.dump 2>&1
echo -e ".\c"
if [ -r /usr/sbin/lsof ]; then
/usr/sbin/lsof -p $PID > $DUMP_DIR/lsof-$PID.dump
echo -e ".\c"
fi
done
if [ -r /usr/bin/sar ]; then
/usr/bin/sar > $DUMP_DIR/sar.dump
echo -e ".\c"
fi
if [ -r /usr/bin/uptime ]; then
/usr/bin/uptime > $DUMP_DIR/uptime.dump
echo -e ".\c"
fi
if [ -r /usr/bin/free ]; then
/usr/bin/free -t > $DUMP_DIR/free.dump
echo -e ".\c"
fi
if [ -r /usr/bin/vmstat ]; then
/usr/bin/vmstat > $DUMP_DIR/vmstat.dump
echo -e ".\c"
fi
if [ -r /usr/bin/mpstat ]; then
/usr/bin/mpstat > $DUMP_DIR/mpstat.dump
echo -e ".\c"
fi
if [ -r /usr/bin/iostat ]; then
/usr/bin/iostat > $DUMP_DIR/iostat.dump
echo -e ".\c"
fi
if [ -r /bin/netstat ]; then
/bin/netstat > $DUMP_DIR/netstat.dump
echo -e ".\c"
fi
echo "OK!"
BTrace is a safe, dynamic tracing tool for the Java platform. BTrace can be used to dynamically trace a running Java program(similar to DTrace for OpenSolaris applications and OS). BTrace dynamically instruments the classes of the target application to inject tracing code("bytecode tracing").
BTrace的主要术语:
- Probe Point: “location” or “event” at which a set of tracing statements are executed. Probe point is “place” or “event” of interest where we want to execute some tracing statements.(探测点,就是我们想要执行一些追踪语句的地方或事件)
- Trace Actions or Actions: Trace statements that are executed whenever a probe “fires”.(当探测触发时执行追踪语句)
- Action Methods: BTrace trace statements that are executed when a probe fires are defined inside a static method a class. Such methods are called “action” methods.(当在类的静态方法中定义了探测触发时执行的BTrace跟踪语句。这种方法被称为“操作”方法。)
Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理
1. 创建Topic时;
2. 生产者发送消息时;
3. 消费者消费数据时;
4. kafka集群扩容时: kafka-reassign-partitions
--------------------------------------
消费者端的分区分配策略:Kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中还提供了另外两种分配策略: RoundRobinAssignor和StickyAssignor。消费者客户端参数partition.assignment.strategy可用配置多个分配策略,彼此之间逗号分割。
RangeAssignor分配策略:原理是按照分区总数和消费者总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic, RangeAssignor策略会将消费者组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
--------------------------------------
分区器Partitions:
KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景下,需要具体业务逻辑控制每条消息落到合适的分区,大部分情况下,使用默认分配规则即可。在KafkaProducer计算分配时,首先根据ProducerRecord中的partition字段指定的序号计算分区。如果没有指明,则使用kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner。从源码可以看出默认的DefaultPartititioner的计算方式如下:
1. 如果key为null, 优先从可用的分区里轮询一个分区;如果没有可用的分区,则从所有分区里轮询一个分区;
2. 如果key不为null, 使用murmur的Hash算法(非加密型hash函数,具备高运算性能及低碰撞率)来计算分区分配。
KafkaProducer支持自定义分区分配方式,首先需要实现和DefaultPartitioner一样的接口org.apache.kafka.clients.producer.Partitioner,然后在KafkaProducer的配置中指定partitioner.class为自定义的分区器。
properties.put("partitioner.class", "com.x.y.z");
-------------------------------------
Kafka集群中增加broker非常方便,但是Topic的Partition不会因为集群中broker的增加而自动增加。可将分布在整个集群上的Partition重新分配到某些机器上,然后可以停止不需要的broker从而实现节约资源的目的。
每个Partition可以有多个Replica,即AR列表。在这个列表中第一个Replica称为Preferred Replica。创建Topic时需要确保Topic的所有Preferred Replica均价分布在Kafka集群中。理想场景中一个Partition中的Leader Replica应该Preferred Replica。这就保证了集群所有的Leader Replica带来的负载在整个集群中是均衡的,如果Broker Shutdown的话,那么Leader Replica带来的负载就不均衡了。
每个Partition只有Leader Replica对外提供读写服务,并且Partition创建时默认的Leader Replica位于Preferred Replica之上,此时Kafka集群是负载均衡的。Broker Shutdown会导致Leader Replica发生迁移,导致Leader Replica在kafka集群中不再均衡,因此某些broker的压力明显大于其他节点。
诊断k8s里容器里的Java进程:
kubectl exec -it <pod> --container <containerId> -- /bin/bash -c "wget https://alibaba.github.io/arthas/arthas-boot.jar && java -jar arthas-boot.jar"
Arthas的thread -b: 找出当前阻塞其他线程的线程。
有时候发现应用卡住了,通常是由于某个线程拿到了某个锁,并且其他线程都在等待这把锁造成的,为了排查这类问题,Arthas提供了thread -b一键找出罪魁祸首。注意:目前只支持synchronized关键字阻塞住的线程,如果是java.util.concurrent.Lock,目前不支持。
---------------------------------
创建topic时如何选择合适的分区数?
从吞吐量方面考虑,增加合适的分区数可以很大程度上提升整体吞吐量,但是超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有着一定程度上的要求,建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值期间。
在创建完主题之后,虽然我们还是能够增加分区的个数,但是基于key计算的主题需要严谨对待。当生产者向Kafka中写入基于key的消息时,Kafka通过消息的key来计算出消息将要写入到哪个具体的分区中,这样具有相同key的数据可以写入到同一个分区中。Kafka的这一功能对于一部分应用是即为重要的,比如日志压缩。
再比如对于同一个key的所有消息,消费者需要按消息的顺序进行有序的消费,如果分区的数量发生变化,那么有序性就得不到保证。在创建主题时,最好能够确定好分区数,这样也可以省去后期增加所带来的多余操作。尤其对于与key高关联的应用,在创建主题时可以适当地多创建一些分区,以满足未来的需求。通常情况下,可以根据未来2年内的目标吞吐量来设定分区数。当然如果应用与key弱关联,并且也具备便捷的增加分区数的操作接口,那么也可以不用考虑那么长远的目标。
有些应用场景会要求主题中的消息都能保证顺序性,这种情况下在创建主题时可以设定分区数为1,这样通过分区有序性的这一特性来达到主题有序性的目的。
当然分区数也不能一昧地增加,分区数会占用文件描述符,而一个进程所能支配的文件描述符是有限的,这个也是我们通常意义上所说的文件句柄的开销。虽然我们可以通过修改配置来增加可用文件描述符的个数,但是凡事总有一个上限,在选择合适的分区数之前,最好再考量一下当前Kafka进程中已经使用的文件描述符的个数。
分区数的多少还会影响系统的可用性。
Kafka通过多副本机制来实现集群的高可用和高可靠,每个分区都会有一至多个副本,每个副本分别存在于不同的broker节点上,并且只有leader副本对外提供服务。在Kafka集群的内部,所有的副本都采用自动化的方式进行管理,并确保所有的副本中的数据都能保持一定程度上的同步。当broker发生故障时,对于leader副本所宿主的broker节点上的所有分区将会暂时处于不可用的状态,此时Kafka会自动的在其他的follower副本中选举出新的leader用于接收外部客户端的请求,整个过程由Kafka控制器负责完成。分区进行leader角色切换的过程中会变得不可用,不过对于单个分区来说这个过程非常的短暂,对于用户而言可以忽略不计。但是如果集群中的某个broker节点宕机,那么就会有大量的分区需要同时进行leader角色切换,这个切换的过程将会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。
假如,一个3节点的Kafka集群中存在3000个分区,每个分区拥有3个数据副本。当其中一个broker节点宕机时,所有1000个分区同时变得不可用。假设每一个分区恢复时间是5ms,那么1000个分区的恢复时间将会花费5秒钟。因此,在这种情况下,用户将会观察到系统存在5秒钟的不可用时间窗口。可以适当地增加一些broker节点来减少单broker节点所负荷的分区,进而降低单broker节点故障引起的短期服务不可用的影响。
如果宕机的broker节点恰好又是Kafka集群的控制器时,在控制器被重新选举到新的broker节点之前这些分区leader角色切换的过程是不会开始进行的。虽说控制器的恢复(重新选举新的控制器)也是自动进行的,整体上不会有太大的问题,但是新的控制器需要加载集群中所有的元数据信息,其中就包括了所有的分区信息,分区数越多加载的耗时就会越长,进而拖慢了控制器的恢复进度,最终也就拖慢了分区服务的恢复进度。
分区数越多也会让Kafka的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。对于旧版的生产者和消费者客户端而言,分区数越多也会增加它们的开销,不过这一点在新版的生产者和消费者客户端中有效地得到了抑制。
如何选择合适的分区数?
从某种意思来说,考验的是决策者的实战经验,更透彻地来说,是对Kafka本身、业务应用、硬件资源、环境配置等多方面的考量而做出的抉择。在设定完分区数,或者更确切的说是创建完主题之后,还要对其追踪、监控、调优以求更改更好的利用它。读者看到本文的内容之前或许没有对分区数有太大的困扰,可能看完之后反而困惑了起来,其实大可不必太过惊慌,一般情况下,根据预估的吞吐量以及是否与key相关的规则来设定分区数即可,后期可以通过增加分区数、增加broker或者分区重分配等手段来进行改进。如果一定要给一个准则的话,笔者给的一个建议是分区数设定为集群中broker的倍数,即假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。不过,如果集群中的broker节点数有很多,比如大几十或者上百、上千,这种准则也不太适用,在选定分区数时进一步的可以引入基架等参考因素。
我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的producer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。
---------------------------------
Rebalance是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展,不过要注意:在Rebalance期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。
消费者通过定期发送心跳-hearbeat到一个组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话-session就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看出,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能消费消息;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在0.11.1版本中,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外,更高版本的Kafka支持配置一个消费者多长时间不拉取消息仍然保持存活,这个配置可以避免活锁(live lock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
之前的版本将offset信息存储在Zookeeper中(/consumers/<group.id>/offsets/<topic>/<partitionId>,ZK写操作性能不高),从Kafka 0.8.2开始Kafka开始支持将consumer的位移消息保存在Kafka内部的topic中(__concumer_offsets),从0.9.0开始默认将offset存储到topic中。Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个Kafka Server都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
发生如下条件时,partition要在consumer中重新分配:
1. 新的consumer加入
2. 旧的consumer挂了
3. coordinator挂了,集群选举出新的coordinator
4. topic的partition新增
5. consumer调用了unsubscrible(), 取消topic的订阅
-------------------------------------------------
JVM晋升到老年代的动态年龄判断:
老版本的JVM的表述如下:虚拟机并不是永远地要求对象的年龄必须达到MaxTenuringThreshold才能晋升到老年代,如果在Survivor空间中相同年龄所有对象大小的总和大于Survivor空间的一半,年龄大于或等于该年龄的对象就可以直接进入老年代,无须等到MaxTenuringThreshold中要求的年龄。
新的动态年龄判断算法:Survivor区的对象年龄从小到大进行累加,当累加到X年龄时的总和大于50%(可以使用-XX:TargetSurvivorRatio来设置保留多少空闲空间,默认是50),那么比X大的都会晋升到老年代。
JVM引入动态年龄计算,主要基于如下两点:
1. 如果固定按照MaxTenuringThreshold设定的阈值作为晋升条件:a):MaxTenuringThreshold设置的过大,原本应该晋升的对象一直停留在Survivor区,直到Survivor区溢出,一旦溢出发生,Eden+Survivor中对象将不再依据年龄全部提升到老年代,这样对象老化的机制就失效了。b):MaxTenuringThreshold设置的过小,“过早晋升”,即对象不能在新生代充分被回收,大量短对象被晋升到老年代,老年代空间迅速增长,引起频繁FullGC。分代回收失去了意义,严重影响GC性能。
2. 相同应用在不同时间的表现不同:特殊任务的执行或者流量成分的变化,都会导致对象的生命周期分布波动,那么固定的阈值设置,无法动态适应变化,会造成和上面同样的问题。
-------------------------------------------------
JVM年轻代到年老代的晋升过程的判断条件是什么:
1. 部分对象会在Survivor From和To区域来回复制,直到对象年龄到达MaxTenuringThreshold;
2. 如果对象的大小大于Eden区的二分之一会直接分配到老年代。如果老年代也分配不下,会做一次FullGC;如果小于Eden的一半但是没有足够的空间,就进行MinorGC;
3. Minor GC后,Survivor仍然放不下,则放在老年代;
4. 动态年龄判断,大于等于某个年龄的对象超过了Survivor空间一半,大于等于某个年龄的对象直接进入老年代;
By default, a participating transaction will join the characteristics of the outer scope, silently ignoring the local isolation level, timeout value or read-only flag(if any). Consider switching the "validateExistingTransactions" flag to "true" on your transaction manager if you'd like isolation level declarations to get rejected when participating in an existing transaction with a different isolation level. This non-lenient mode will also reject read-only mismatches, i.e. an inner read-write transaction trying to participate in a read-only outer scope.
(PROPAGATION_REQUIRED)However, in the case where an inner transaction scope sets the rollback-only marker, the outer transaction has not decided on the rollback itself, and so the rollback (silently triggered by the inner transaction scope) is unexpected. A corresponding UnexpectedRollbackException is thrown at that point. This is expected behavior so that the caller of a transaction can never be misled to assume that a commit was performed when it really was not. So if an inner transaction (of which the outer caller is not aware) silently marks a transaction as rollback-only, the outer caller still calls commit. The outer caller needs to receive an UnexpectedRollbackException to indicate clearly that a rollback was performed instead.
怎么实现所有线程在等待某个事件的发生才会去执行:
1. 读写锁:刚开始主线程获取写锁,然后所有子线程获取读锁,然后等事件发生后,主线程释放写锁;
2. CountDownLatch: 初始值设为1,所有子线程调用await等待,等事件发生后,调用countDown方法计数减一;
3. Semaphore: Semaphore初始值为N,主线程先调用acquire(N)申请N个信号量,其他线程调用acquire()阻塞等待,等事件发生后,主线程释放N个信号量。
4. CycliBarrier
MVCC比锁定的优势:
1. 使用MVCC多版本并发控制比锁定模式的主要优点是在MVCC里,对检索(读)数据的锁要求与写数据的锁要求不冲突,所以读不会阻塞写,而写页不会阻塞读。
2. 在数据库里也有表和行级别的锁定机制,用于给那些无法轻松接受MVCC行为的应用。不过,恰当地使用MVCC总会提供比锁更好地性能。
InnoDB:通过为每行记录添加两个额外的隐藏的值来实现MVCC,这两个值一个记录这行数据何时被创建,另外一个记录这行数据何时过期(或者被删除)。但是InnoDB并不存储这些事件发生时的实际时间,相反它只存储这些事件发生时的系统版本号。这是一个随着事务的创建而不断增长的数字。每个事务在事务开始时会记录它自己的系统版本号。每个查询必须去检查每行数据的版本号与事务的版本号是否相同。
-------------------------------------------------------------
在隔离级别是Repeatable Read时这种策略是如何应用MVCC的:
Select InnoDB必须保证每行数据符合两个条件:
1. InnoDB必须找到一个行的版本,它至少要和事务的版本一样老(即它的版本不大于事务的版本号)。这保证了不管是事务开始之前,或者事务创建时,或者修改了这行数据的时候,这行数据是存在的。
2. 这行数据的删除版本必须是未定义的或者比事务版本要大。这可以保证在事务开始之前这行数据没有被删除。这里不是真正的删除数据,而是标记出来的删除。真正意义的删除是在commit的时候。
符合这两个条件的行可能会被当做查询结果而返回。
Insert: InnoDB为这个新行记录当前的系统版本号;
Delete: InnoDB将当前的系统版本号设置为这行的删除ID;
Update: InnoDB会写一个这行数据的新拷贝,这个拷贝版本为当前的系统版本号。它同时也会将这个版本号写到旧行的删除版本里。
这种额外的记录所带来的结果就是对于大多数查询来说根本就不需要获得一个锁。他们只是简单地以最快的速度来读取数据,确保只选择符合条件的行。这个方案的缺点在于存储引擎必须为每一行存储更多的数据,做更多的检查工作,处理更多的善后操作。
-------------------------------------------------------------
MVCC只工作在Repeatable Read和Read Committed隔离级别下。Read uncommitted不是MVCC兼容的,因为查询不能找到符合他们事务版本的行版本:它们每次只能读到最新的版本。Seriablabel也与MVCC不兼容,因为读操作会锁定它们返回的每一行数据。
快照读:MVCC读取的是快照中的数据,可以减少加锁带来的开销;
当前读:读的是最新数据,需要加锁;
Next-key Lock结合MVCC解决幻读问题:Next-key Lock是Record Lock和Gap Lock的结合。Record Lock: 锁定一个记录的索引,而不是记录本身;Gap Lock: 锁定索引之间的空隙,但不包含索引本身。
InnoDB的MVCC可以是通过在每行记录中保存两个隐藏的列来实现的:创建事务ID、删除事务ID。每开始一个新的事务,系统版本号(可以理解为事务的ID)就会自动递增,事务开始时刻的系统版本号会作为事务ID。
InnoDB的最基本行记录(row)中包含一些额外的存储信息:DATA_TRX_ID, DATA_ROLL_PTR, DB_ROW_ID, DELETE BIT。
DATA_TRX_ID:6字节,标记了最新更新这行记录的transaction id, 每处理一个事务,事务ID自动增加1;
DATA_ROLL_PTR:7字节,指向当前记录项的rollback segment的undo log记录,找之前版本的数据就是通过这个指针;
DB_ROW_ID:6字节,innodb自动产生聚集索引时,聚集索引包括这一列,否则聚集索引中不包括这个值;
DELETE BIT: 用于标识该记录是否被删除,这里不是真正的删除数据,而是标记的删除。真正的删除是在commit的时候。
InnoDB的MVCC实现方式如下:
a: 事务以排他锁的形式修改原始数据;
b: 把修改前的数据存放到undo log,通过回滚指针与主数据关联;
c: 修改成功(commit)啥都不做,失败则恢复undo log中的数据(rollback)。
MySQL通过MVCC以及(Next-Key Lock)实现了可重复读(Repeatable Read),其思想(MVCC)就是记录数据的版本变迁,通过选择不同数据的版本从而能够对用户呈现一致的结果。
注意MVCC仅仅在纯select时有效(不包括select for update, lock in share mode等加锁操作,以及update/insert等)。
/* A recursive result-bearing ForkJoinTask
*/
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
V result;
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
return true;
}
}
/* A recursive resultless ForkJoinTask. This class establishes conventions to parameterize resultless actions as Void ForkJoinTasks. Because null is the only valid value of type Void, methods such as join always return null upon completion.
*/
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
protected abstract void compute();
public final Void getRawResult { return null; }
protected final void setRawResult(Void mustBeNull) {}
protected final boolean exec() {
compute();
return true;
}
}
/*
A scalable concurrent ConcurrentNavigableMap implementation. The map is sorted according to Comparable natural ordering of its keys, or by a Comparator provided at map creation time, dependening on which constructor is used.
This class implements a concurrent variant of SkipLists providing expected average log(n) time cost for the containsKey, get, put and remove operations and their variants. Insertion, removal, update, and access operations safely execute concurrently by multiple threads.
Iterators and spliterators weakly consistent.
Ascending key ordered views and their iterators are faster than descending ones.
All Map.Entry pairs returned by methods in this class and its views represent snapshots of mappings at the time they were produced. They do not support the Entry.setValue method. (Note however that it is possible to change mapping in the associated map using put, putIfAbsent, or replace, depending on exectly which effect you need.)
Beware that, unlike in most collections, the size method is not a constant-time operation. Because of the asynchronous nature of these maps, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations putAll, equals, toArray, containsValue, clear are not guaranteed to be performed atomically. For example, an iterator operating concurrently with a putAll operation might view only some of the added elements.
This class and its views and iterators implement all of the optional mehtods of the Map and Iterator interfaces. Like most other concurrent collections, this class does not permit the use of null keys or values because some null return values cannot be reliably distinguished from the absence of elements.
*/
public class ConcurrentSkipListMap<K, V> extends AbstractMap<K, V>
implements ConcurrentNavigableMap<K, V>, Cloneable, Serializable {
}
哪些情景下会造成消息漏消费: 先提交offset,后消费消息。自动commit功能。
AOP concepts:
1. Aspect: a modularization of a concern that cuts across multiple classes.
2. Join point: a point during the execution of a program, such as the execution of a method or the handling of an exception. In Spring AOP, a join point always represents a method execution.
3. Advice: action taken by an aspect at a particular join point. Different types of advice include "around", "before" and "after" advice. Many AOP frameworks, including Spring, model an advice as an interceptor, maintaining a chain of interceptors aroud the join point.
4. Pointcut: a predicate that matches join points. Advice is associated with a pointcut expression and runs at any join point matched by the pointcut(for example, the execution of a method with a certain name). The concept of join point as matched by pointcut expressions is central to AOP, and Spring uses the AspectJ pointcut expression language by default.
5. Introduction: declaring additional methods or fields on behalf of a type. Spring AOP allows you to introduce new interfaces( and a corresponding implementation) to any advised object. For example, you could use an introduction to make a bean implement an IsMofified interface, to simplify caching. (An introduction is known as an inter-type declaration in the AspectJ community.)
6. Target object: object being advised by one ore more aspects. Also referred to as the advised object. Since Spring AOP is implemented using runtime proxies, this object will always be a proxied object.
7. AOP proxy: an object created by the AOP framework in order to implement the aspect contracts (advise method executions and so on). In the Spring Framework, an AOP proxy will be a JDK dynamic proxy or a CGLIB proxy.
8. Weaving: linking aspects with other application types or objects to create an advised object. This can be done at compile time(using the AspectJ compiler, for example), load time, or at runtime. Spring AOP, like other pure Java AOP framework, perform weaving at runtime.
Eureka: Why is it so slow to register a service?
Being an instance also involves a periodic heartbeat to the registry(through the client's serviceUrl) with a default duration of 30 seconds. A service is not available for discovery by clients unit the instance, the server, and the client all have the same metadata in their local cache (so it could take 3 hearbeats). You can change the period by setting eureka.instance.leaseRenewalIntervalInSeconds. Setting it to a value of less than 30 speeds up the process of getting clients connected to other services. In production, it is probably better to stick with the default, because of internal computations in the server that make assumptions about the lease renewal period.
eureka.server.enable-self-preservation
Eureka开发测试环境快速剔除失效服务:
1. 服务端设置:
#关闭保护机制
eureka.server.enable-self-preservation=false
#剔除失效服务间隔
eureka.server.eviction-interval-timer-in-ms=2000
2. 客户端配置:
#Eureka客户端向服务端发送心跳的时间间隔。(客户端告知服务端会按照此规则)
eureka.instance.lease-renewal-interval-in-senconds=10
#Eureka服务端在收到最后一次心跳之后等待的时间上限,超过则剔除(客户端告知服务端按此规则等待客户端)
eureka.instance.lease-expiration-duration-in-senconds=5
上面3的时间的默认值分别为:
EurekaServerConfigBean: private long evictionIntervalTimerInMs = 60 * 1000; #服务端60秒的剔除间隔
/**
* Indicates how often (in seconds) the eureka client needs to send heartbeats to
* eureka server to indicate that it is still alive. If the heartbeats are not
* received for the period specified in leaseExpirationDurationInSeconds, eureka
* server will remove the instance from its view, there by disallowing traffic to this
* instance.
*
* Note that the instance could still not take traffic if it implements
* HealthCheckCallback and then decides to make itself unavailable.
*/
EurekaInstanceConfigBean: private int leaseRenewalIntervalInSeconds=30;
/**
* Indicates the time in seconds that the eureka server waits since it received the
* last heartbeat before it can remove this instance from its view and there by
* disallowing traffic to this instance.
*
* Setting this value too long could mean that the traffic could be routed to the
* instance even though the instance is not alive. Setting this value too small could
* mean, the instance may be taken out of traffic because of temporary network
* glitches.This value to be set to atleast higher than the value specified in
* leaseRenewalIntervalInSeconds.
*/
EurekaInstanceConfigBean: private int leaseExpirationDurationInSeconds=90;
分区中的所有副本统称为AR(Assigned Repllicas)。所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas),由此可见:AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。
Leader副本负责维护和跟踪ISR集合中所有的follower副本的滞后状态,当follower副本落后太多或者失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中的follower副本追上了leader副本,之后在ISR集合中的副本才有资格选举为leader,而在OSR集合中的副本则没有机会(这个原则可以通过修改对应的参数配置来改变)。
ISR的伸缩:
1. Kafka在启动的时候会启动两个与ISR相关的定时任务,分别为: isr-expiration和isr-change-propagation。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数的一半。默认值为5000ms,当检测到ISR中有失效的副本的时候,就会缩减ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到ZK对应的节点/brokers/topics/partition/state节点中,节点数据示例如下:{"controller_epoch":26, "leader":0, "version":1, "leader_epoch":2,"isr":{0, 1}},其中controller_epoch表示的是当前的kafka控制器epoch。leader表示当前分区的leader副本所在的broker ID。version固定值1,leader_epoch表示当前分区的leader纪元,isr表示变更后的isr列表。
2. 除此之外,当ISR集合发生变更的时候还会将变更后的记录缓存到isrChangeSet中,isr-change-propagation任务会周期性(固定值2500ms)地检查isrChangeSet,如果发现isrChangeSet中有ISR集合的变更记录,那么它会在ZK的/isr_change_notification的路径下创建一个以isr_change开头的持久顺序节点(比如isr_change_notification/isr_change_0000000),并将isrChangeSet中的信息保存到这个节点。kafka控制器为/isr_chang_notification添加了一个Watcher,当这个节点有子节点发生变化的时候会触发Watcher动作,以此通知控制器更新相关的元数据并向它管理的broker节点发送更新元数据的请求。最后删除/isr_change_notification的路径下已经处理过的节点。频繁的触发Watcher会影响kafka控制器,ZK甚至其他broker的性能。为了避免这种情况,kafka添加了指定的条件,当检测到分区ISR集合发生变化的时候,还需要检查两个条件:
1): 上一次ISR集合发生变化已经超过5秒;
2): 上一次写入ZK的时候距离现在已经超过60秒;
满足以上两个条件之一就可以将ISR写入集合的变化的目标节点。
3. 有缩减就会有补充,kafka何时扩充ISR。随着follower副本不断进行消息同步,follower副本LEO也会逐渐后移,并且最终赶上leader副本,此时follower副本就有资格进入ISR集合,追赶上leader副本的判断准则是此副本的LEO是否小于leader副本的HW,这里并不是和leader副本的LEO相比。ISR扩充之后同样也会更新ZK中的/broker/topics/partition/state节点和isrChangeSet,之后步骤就和ISR收缩相同。
4. 当ISR集合发生增减时,或ISR集合中任意副本LEO发生变化时,都会影响到整个分区的HW。
kafka的LW是Low Watermark的缩写,低水位。代表AR集合中最小的logStartOffset值,副本的拉取请求(FetchRequest, 它有可能触发新建日志segment而旧的被清理,进而导致logStartOffset的增减)和删除请求(DeleteRecordRequest)都有可能促使LW的增长。
Kafka怎么实现消费者多线程:
1. 每个partition分配一个实例/线程,尽量确保一个实例/线程只消费一个分区;
2. 每个实例里多个线程消费多个partition,如果不要求分区顺序性,则不做处理;如果要求分区顺序性,可内置一个队列,多个线程从队列获取消息;同时可以考虑使用线程池。
public class KafkaProducer<K, V> implements Producer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
private String clientId;
private final Partitioner partitioner;