Skip to content

Latest commit

 

History

History
380 lines (371 loc) · 40 KB

Kafka权威指南读书笔记.md

File metadata and controls

380 lines (371 loc) · 40 KB

Kafka权威指南读书笔记

第一章 初识Kafka

  • 发布与订阅消息系统:发布者以某种方式对消息进行分类,接收者 (订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个broker ,也就是发布消息的中心点。
  • 文件系统或数据库提交日志用来提供所有事务的持久记录,通过重放这些日志可以重建系统的状态
  • Kafka的数据按照一定顺序持久化保存,可以按需读取
  • Kafka的数据分布在整个系统里,具备数据故障保护和性能伸缩能力
  • 消息
    • Kafka的数据单元,由字节数组组成。消息有一个可选的键,也是一个字节数组,可用于取模后进行消息分区
    • 消息会被分批写入Kafka,一批内的消息属于同一个主题或分区。批次的大小需要在时间延迟和吞吐量上做权衡。批次数据会被压缩,有利于存储和传输,但使用时需要额外计算开销
    • 消息可以具有模式(Schema),如JSON或XML,或者Apache Avro(常用)
  • 主题和分区
    • Kafka的消息通过主题进行分类(类似数据库的表或文件系统的文件夹)
    • 主题可以被分为若干个分区,一个分区就是一个提交日志
    • 消息以追加的方式写入分区,以FIFO顺序读取
    • 由于主题一般包含几个分区,因此无法在整个主题范围内保证消息顺序,只能保证消息在单个分区中有序
    • Kafka通过分区实现数据冗余和伸缩性(分区可以分布在不同服务器上,使主题横跨多服务器)
  • 生产者
    • 创建消息,一般情况下消息会被发布到一个特定主题上,均衡分布到该主题下所有分区中(也能用消息键和分区器写到特定分区)
  • 消费者
    • 读取消息,订阅一个或多个主题,按消息生成的顺序读取
    • 可以通过检查消息的偏移量来区分已读取的消息
  • 偏移量
    • 是个不断递增的整数
    • 是每个消息携带的元数据。偏移量是唯一的
    • 消费者把每个分区最后读取的偏移量保存在ZooKeeper或Kafka(更新的方案)上,以便关闭或重启后恢复读取状态
  • 消费者群组
    • 由一个或多个消费者组成,共同读取某个主题
    • 群组保证每个分区只能被一个消费者使用
    • 一个消费者可以读多个分区
    • 消费者与分区之间的关系通常被称为所有权关系
    • 通过这种方式,消费者可以消费包含大量消息的主题,且若出现消费者失效,群组内其它消费者可进行工作接管
  • broker
    • 一个独立的Kafka服务器
    • 接收来自生产者的消息,为消息设置偏移量,提交消息到磁盘保存
    • 对消费者读取分区的请求作出响应,返回磁盘上的消息
    • 单broker具备处理千级分区和每秒百万级的消息量
    • 每个集群有一个broker充当集群控制器的角色(自动选举)
    • 控制器负责管理,如将分区分配给broker、监控broker
    • 一个分区从属于一个broker,该broker即为该分区的首领
    • 一个分区可以将副本分配给多个broker(以分区复制的方式提供消息冗余),此时原分区为首领副本,其它分区为跟随者副本,后者会从前者复制消息
    • broker失效时其它broker可以接管首领权,但需要相关的生产者与消费者重新连接
  • 保留消息
    • Kafka默认的消息保留策略是保存一段时间或消息达到一定字节数后删除旧消息
    • 主题可以配置自己的保留策略(比如保留到不再使用为止)
    • 可以通过配置把主题当作紧凑型日志, 只有最后一个带有特定键的消息会被保留下来(变更点)
  • Kafka建议部署在多集群的原因:
    • 数据类型分离
    • 安全需求隔离
    • 多数据中心(灾难恢复)
  • Kafka的消息复制仅限于集群内。集群间的消息复制需要用到MirrorMaker,其本质也是在两个集群间维护消息队列,消费者从一个集群读取消息后,由生产者发布至另一集群
  • Kafka的优势
    • 多个生产者:从多个系统中收集数据并以统一格式对外提供,使消费者无需协调不同生产者间的数据流
    • 多个消费者:多个消费者从单消息流上各自读取数据,互不影响。且可设定群组共享消息流并保证每条消息只处理一次
    • 基于磁盘的数据存储:支持持久化非实时消息读取
    • 伸缩性:broker可由少到多灵活扩展
    • 高性能:通过横向扩展生产者、消费者和broker,可保证亚秒级的大数据消息处理延迟

第二章 安装Kafka

  • Kafka使用Zookeeper保存集群的元数据信息和消费者信息,因此可以在Zookeeper上查看Kafka集群状态(如主题、分区、复制数等)
  • Zookeeper
    • Zookeeper集群被称为群组
    • 使用一致性协议,所以建议每个群组里应该包含奇数个节点因为只有当群组里的大多数节点(超过半数)处于可用状态,Zookeeper才能处理外部的请求。并且群组节点过多会导致群组性能下降(一般不超过7个)
    • 更改群组配置(包括增加节点)会需要依次重启每一个节点
    • 节点间通信使用TCP协议,首领选举的TCP端口与通信端口不一样
  • Kafka会把所有消息保存在磁盘上,存放日志片段的目录由log.dirs指定。如果指定了多个路径,则broker会把同一个分区的片段保存在同一路径下,且会用“最少使用”原则分配目录(基于文件数,而不是空间大小
  • Kafka自动创建主题的时机(可以关掉):
    • 生产者往主题写入消息
    • 消费者从主题读取消息
    • 客户端向主题发送元数据请求
  • 主题下分区的数目只增不减
  • 选定分区数量时需要考量的因素:
    • 主题的吞吐量
    • 从单个分区读取数据的最大吞吐量
    • 生产者向单个分区写入数据的最大吞吐量
    • 每个broker包含的分区个数、可用的磁盘空间和网络带宽
    • 如果消息按照键来指定分区写入,增加分区会变得困难
    • 单个broker对分区个数有限,由于内存占用和首领选举时间
  • 消息会被写入日志片段,日志片段大小可以设置,写满一个之后会关闭,然后开启一个新片段。片段被关闭后才开始计算过期时间。因此一个消息真正的有效时间是(写入日志后到日志关闭的时间 + 过期时间
  • 用时间戳获取偏移量时,Kafka会根据日志片段修改时间(已关闭的)找到包含该时间戳的日志片段,并返回日志片段开头的偏移量。因此日志片段越小,结果越准确
  • 磁盘性能影响生产者,内存影响消费者
  • 消费者一般从分区尾部读取消息,如果有生产者存在,则紧跟着生产者,此时消息会直接存放在系统的页面缓存中,比磁盘上读取快
  • 决定broker数量考虑的因素:
    • 单个broker的磁盘空间及集群需要保留的空间数(如果开启复制则需要加倍)
    • 集群处理请求的能力

第三章 Kafka生产者——向Kafka写入数据

  • 生产者向Broker写入数据前,需要经过序列化->分区器->记录批次几个步骤
  • 写入成功后Broker会返回一条元数据记录,否则会重试,若最终还是失败,则返回错误消息
  • 生产者只需要知道两个broker信息,因为可以通过broker获取其它broker信息。第二个用来容错
  • 发送消息的方式:
    1. 发送并忘记(fire-and-forget):发送后并不关心是否到达。大多数情况下消息会正常到达,因为Kafka的高可用性会自动尝试重发
    2. 同步发送:使用send()方法发送消息,返回一个Future对象,调用get()进行等待,获取发送结果
    3. 异步发送:在使用send()方法时指定一个回调函数,服务器在返回响应时调用该函数
  • 当连接失败或出现无首领异常的时候,消息会自动重发,因为这些问题可解决(重建连接、重新选举)。但如果碰到诸如消息过大这类错误,则不会重试,直接抛出异常
  • 生产者的配置:
    • acks:指定必须要有多少个分区副本收到消息,才认为消息写入成功。0代表不用等,1代表首领节点收到消息(如果暂时没首领,生产者会收到错误响应并重发消息,若没收到消息的新节点成为首领则消息会丢失),all代表所有参与复制的节点收到消息
    • buffer.memory:生产者内存缓冲区大小,用于缓冲发送到服务器的消息。当产生消息速度大于发送速度时,send()有可能在此处阻塞
    • max.block.ms:调用send()方法或使用partitionsFor()方法获取元数据时生产者最大阻塞时间。如缓冲区被阻塞或没有可用的元数据时会导致此阻塞,超时会抛出异常
    • compression.type:默认不采用压缩,但使用压缩可以降低网络传输开销和存储开销,即Kafka发送消息的瓶颈
    • retries:失败后消息重发次数,亦可手动设置重试间隔
    • batch.size:消息批次大小,不一定要满了才发,有可能按时发
    • linger.ms:消息批次发送最大等待时间
    • max.in.flight.requests.per.connection:在收到服务器响应前可以发送多少个消息,设置为1时可以保证消息被顺序写入(即使发生了重试)
    • timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms:超时重试时间,超时后要么重试要么返回错误(抛出异常或执行回调)
    • max.request.size:单请求中消息最大值,最好与broker端接收消息配置匹配,以免消息不能被broker接收
    • receive.buffer.bytes和send.buffer.bytes:TCP socket接收和发送数据包的缓冲区大小。设为-1则使用操作系统默认值。生产者或消费者与Broker处于不同数据中心时可以适当增大,因为一般这样的网络延迟高带宽低
  • 分区:
    • 当键为空时,分区器使用轮询(Round Robin)算法将消息均衡分布到各个区上
    • 当键不为空时,分区器对键做散列映射到特定分区上。做散列时会用到同一主题下所有分区(不管是否可用),所以有可能写到不可用的分区,导致错误,但这情况不常见
    • 创建主题时把分区规划好,否则后续新增分区会导致散列值计算不一致
    • 可以自定义分区策略,以免出现数据倾斜

第四章 Kafka消费者——从Kafka读取数据

  • 同一个群组内消费者的数量不应超过分区数,否则会有消费者被闲置
  • 同一个主题可以被多个消费者群组消费,每个群组都可以消费到所有消息
  • 分区的所有权从一个消费者转移到另一个消费者时(消费者不可用或添加了分区),被称为“再均衡”,会导致整个群组一段时间内不可用,且之前消费者的读取状态会丢失,缓存亦会失效,要尽量避免
  • 消费者通过向被指派为“群组协调器”的Broker发送心跳来维持对群组的从属关系及分区的所有权
  • 消费者会在轮询消息或提交偏移量时发送心跳,要是死了会触发再均衡
  • 0.10.1版本中引入了独立心跳线程,可将此操作与轮询消息分离
  • 第一个加入群组的消费者是群主,会从群组协调器Broker那获得群组成员列表(通过心跳保活),并负责给成员分配分区(实现PartitionAssignor接口)
  • 群主把分配情况发给群组协调器Broker,协调器再把信息发送给群成员,成员只能看到自己的分配信息,这个过程每次再均衡的时候都要做
  • 消费者订阅主题时可以将主题名设为正则表达式,这样当符合匹配的新主题加入时会触发一次再均衡,以使消费者可读取新主题。可以用这个方法订阅一类主题
  • 消息轮询是消费者API的核心,订阅主题后,轮询会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要处理从分区返回的数据即可
  • 消费者必须一直轮询消息(调用poll()方法),否则会被以为死掉了
  • poll()方法返回一个记录列表,每条记录包含了所属主题的信息、分区信息、记录在分区的偏移量、记录的键值对
  • 使用close()主动关闭消费者时,网络连接、socket会随之关闭,会提交一次偏移量,并触发一次再均衡。不要去等协调者Broker发现它死了,因为这会导致整个群组在一段时间内无法读取消息
  • 轮询期间的工作尽快完成,别对数据做太多处理,免得耗时
  • 消费者的配置:
    • fetch.min.bytes:消费者从服务器获取记录的最小字节数。Broker收到消费者请求时会等消息到这么多了再发
    • fetch.max.wait.ms:当Broker的数据不足并且收到消费者请求时,等这么长时间就不等了,直接把现有的数据全发过去
    • max.partition.fetch.bytes:服务器从每个分区里返回给消费者的最大字节数,默认1MB。此值必须比broker能接收的最大消息大,否则消费者可能无法读取这些消息(因为太大了),从而导致一直挂起重试。除此之外还要考虑消费者处理消息的时间,免得两次轮询期间处理数据太久导致会话过期(亦可延长会话过期时间)
    • session.timeout.ms:被认定死亡前与服务器断开连接的最长时间,默认3s。此值与heartbeat.interval.ms值息息相关,心跳发送间隔必须小于会话过期时间,一般取三分之一
    • auto.offset.reset:指定消费者在读取没有偏移量的分区或偏移量无效(消费者死太久导致包含偏移量的记录过时被删除)的时候作何处理,默认值为latest,即直接从最新的消息开始读,另一个值是earliest,即从起始位置读取分区记录
    • enable.auto.commit:是否自动提交偏移量,默认为true。会把poll()返回数据中的最大偏移量提交出去。提交动作会在轮询开始时做检查(提交上一次轮询的偏移量)。可以改为false以防止数据重复和丢失,转而自己控制提交时机(true时可控制提交频率)
    • partition.assignment.strategy:分区分配给消费者采取的策略,默认有两种:
      • Range:同一主题下若干个连续分区分给同一个消费者。可能会导致多主题场景下第一个消费者比第二个消费者多很多分区的情况
      • RoundRobin:把所有主题的所有分区轮流发给消费者,故消费者分到的分区数最多差一个
    • max.poll.records:单次调用call()方法返回的记录数量,以控制数据处理速度
    • receive.bufferr.bytes和send.buffer.bytes:socket读取数据时用到的TCP缓冲区大小,设为-1时使用操作系统默认值,生产者或消费者与Broker处于不同数据中心时可以适当增大,因为一般这样的网络延迟高带宽低
  • 提交和偏移量:
    • 更新分区当前位置的操作叫提交
    • 提交偏移量的本质:消费者往一个名为_consumer_offset的特殊主题发送消息,里面包含每个分区的偏移量
    • 偏移量主要用于发生再均衡后的分区状态读取
    • 可以通过调用commitSync()方法手动触发同步提交,会提交poll()记录中的最新偏移量,或commitAsync()异步提交(有可能导致因偏移量提交顺序不一致而出现数据重复,最好在回调里做好失败记录)
    • 可以使用try(poll->commitAsync)catch(记录失败)finally(commitSync->close)的方式组合提交,以确保提交成功
    • 提交偏移量时也支持在方法中传入<分区,偏移量>的Map,就不用每次都提交最新偏移量,转而可以手动指定了
  • onPartitionsRevoked()会在消费者停止读取消息后,再均衡开始前被调用,可以在这里提交偏移量(要提交处理过的,而不是正在处理的,因为有可能会被撤回)
  • onPartitionsAssigned()会在重新分配消费者后,读取消息开始前被调用
  • 使用上面这两个方法需要在订阅主题调用subscribe()方法时多传一个ConsumerRebalanceListener,并在其中实现方法代码
  • 可以用seek()方法指定读取的偏移量,下一次调用poll()就能返回该偏移量上开始的消息,如果偏移量不存在,poll()会抛出异常
  • 退出轮询循环:
    • 在另一个线程里调用consumer.wakeup(),这是消费者唯一一个可以由其它线程安全调用的方法。
    • 调用wakeup()可以使下一次poll()退出并抛出WakeupException
    • 如果循环运行在主线程中,可以在Runtime.getRuntime().addShutdownHook()方法中调用wakeup()方法
  • 消费者可以不订阅主题(加入群组),而是直接调用partitionsFor(主题名)让主题返回所有分区,并直接用assign()方法把自己挂上去,然后从指定分区读取消息。这么操作可以避免发生再均衡,但新分区加入时不会收到提醒,要周期性调用partitionsFor()方法或添加分区后重启应用程序

第五章 深入Kafka

  • 集群成员关系
    • Kafka使用Zookeeper维护集群成员信息
    • broker启动的时候会通过创建临时节点把自己的ID注册到Zookeeper
    • Kafka组件订阅Zookeeper的/brokers/ids路径,当有broker加入或退出集群时,组件便会获得通知
    • broker停机、出现网络分区或长时间GC停顿时,会从Zookeeper上断开连接,启动时创建的临时节点会被移除,监听broker列表的Kafka组件会被告知
    • 关闭broker时,对应的节点也会消失,但ID会继续保留在其它数据结构中(比如主题的副本列表)。因此之后用相同的ID再启动一个新broker,它会自动加入集群,并拥有与旧broker相同的分区和主题
  • 控制器
    • 集群里每个启动的broker都会尝试在Zookeeper里创建一个临时节点/controller让自己成为控制器。但只有第一个会成功,后面的会收到异常,并转而在控制器节点上创建Zookeeper watch对象,以收到此节点的变更通知,这样可以确保集群中只有一个控制器存在
    • 旧控制器断开时,其它broker会通过watch收到消息,并把自己建立为新控制器,照样只有一个能成功。新控制器会通过Zookeeper的条件递增(?)操作获得一个更大的epoch值。其它broker知道后,若以后收到控制器发出的带更小epoch值的消息(来自旧控制器),会选择无视
    • 当broker离开集群时,控制器会为从属于这个broker的分区选择新首领,并将请求发送给新首领及现有跟随者的broker节点,其中包含了新首领及跟随者信息,随后新首领开始处理生产者与消费者请求,跟随者开始从新首领那复制消息
    • 当控制器发现新broker加入集群时,会使用broker ID检查其是否含有分区副本,若有,则控制器把变更通知发给所有broker,新broker上的副本开始从首领复制消息
  • 复制
    • 首领通过查看每个跟随者请求的偏移量来确定其同步状态
    • 持续请求得到的最新副本称为同步的副本。在首领失效时,只有同步副本有资格被选为新首领
    • 可以用replica.lag.time.max.ms参数来决定跟随者成为不同步副本的最大请求延迟时间
    • 在创建主题时选定的首领叫做首选首领(初代首领),首选首领的产生会考虑集群整体负载(不像后面提拔的首领谁行谁上),因此是作为首领的最优选项。默认情况下,Kafka会自动再均衡首领,即检查当前首领是不是首选首领,若不是,且首选首领的副本是同步副本,则触发一次选举,让原首领归位
    • 从分区的副本清单里可以找到首选首领(第一个)
  • 处理请求
    • broker会在监听的每个端口上创建Acceptor线程,每个线程会创建一个连接,并将其交给Processor线程处理(数量可配置)。
    • Processor负责从客户端获取请求消息,放入请求队列,并从响应队列中获取响应消息发送给客户端
    • 每个broker中会缓存全量元数据(主题、主题包含的分区、每个分区的副本、首领副本位置),客户端可以通过给任意broker发送元数据请求来获取真正需要请求的broker信息(一般会把结果缓存起来并定时刷新)
    • 客户端发送消息到错误的broker上时会收到“非首领”错误,会在重发请求前先刷新元数据
    • 首领副本broker收到生产请求时,会做以下验证:
      • 用户是否有主题写入权限
      • 请求中的acks值是否有效(0、1、all)
      • 如果acks=all,是否有足够多同步副本来保证可靠性(同步副本数量不足时broker可以拒绝处理新消息)
    • 当首领分区完成消息写入后,若acks为0或1则会立即返回响应,若为all,则将请求保存在“炼狱”缓冲区,直到所有跟随者复制完消息才返回响应消息
    • broker使用零拷贝技术给消费者发送消息,即直接把消息从文件系统缓存发送到网络通道
    • 大部分客户端只能读取已经被写入同步副本的消息,以保证可靠性,因此broker间的消息复制有可能拖慢客户端接收消息
  • 物理存储
    • Kafka的基本存储单元是分区
    • 分区的大小受到挂载点可用空间的限制(单个磁盘或RAID磁盘阵列)
    • 分配副本位置时除了考虑broker节点,还需要考虑机架位置(尽量别在一起)
    • 在单个broker上为分区分配文件目录时,均衡原则只会考虑该目录下分区数量,而不会考虑分区大小(要注意大小磁盘)
    • 前面已经讲过分区是会分片段存储的
    • 当前正在写入数据的片段叫做活跃片段(不会被删)
    • broker会为分区中每个片段打开一个句柄(即使不活跃?),也许会需要操作系统层面的调优
  • 文件格式
    • 生产者->broker->消费者,这个过程中消息的数据格式一致
    • 除了键值、偏移量外消息中还有一些元数据:消息大小、校验和、消息格式版本号、压缩算法、时间戳(可以配置生产者发送时间或broker接收时间)
    • 同一个批次的消息会被压缩到一起从生产者流动到消费者
  • 索引
    • Kafka为每个分区维护了一个索引,以支持根据偏移量快速访问消息。索引把偏移量映射到片段文件和偏移量在文件中的位置
    • 索引亦分段存储,所以删除消息时可以删除相应索引,如果索引损坏,Kafka会重新生成。因此索引可以正常删除
  • 清理
    • Kafka可以通过改变主题保留策略来让每个键只保留最新值,在某些场景下有用,如记录应用状态、记录客户地址
    • 每个日志片段可以分为干净部分(上一次清理留下来的数据)与污浊部分(上一次清理后写入的数据)
    • 启动了清理功能的Kafka会启动一个清理管理器线程与多个清理线程执行清理任务
    • 清理线程会选择污浊率较高的线程进行清理
    • 清理分区时会将污浊的数据的键-值存入map中,并将原有信息从旧到新与map信息比对,若map中存在信息的key,说明信息该更新了,若不存在,则说明已为最新。最新的信息会被复制到一个替换片段中,并在所有信息复制完成后与原始片段交换
    • 所有线程创建map的总大小可配置,每个map必须能放下一个完整的片段,否则会报错。此时要么增加map内存,要么减少线程数
  • 删除
    • 应用程序可以通过发送<键-null>消息(墓碑消息)来将该键对应的所有数据删除
    • 清理线程发现该消息时会将该键下的其它值全部删掉,只留null值
    • 墓碑消息会保留一段时间(可配置)以便消费者能发现,但消费者离线太久的情况下这个消息也可能在被发现前就不在了

第六章 可靠的数据传递

  • 可靠性保证
    • Kafka可以保证分区消息的顺序。同一个生产者前后往同一分区写消息,后写的消息偏移量会更大,且会被消费者后读取
    • 只有消息被写入所有同步副本(不一定要写入磁盘,所以这里有可能丢数据),才会被认为是“已提交”的
    • 只要还有一个副本活跃,提交的消息就不会丢
    • 可靠性配置可以对broker级别配置,也可以对主题级别配置,因此可以有不可靠的主题
    • 消费者只能读取已提交的消息
  • 跟随者副本成为同步副本的要求:
    • 与Zookeeper之间有活跃会话(通过心跳保活)
    • 在过去10s(可配置)内从首领那获取过消息
    • 在过去10s内从首领那获取过最新的消息(保证延迟)
  • 完全的首领选举:在选举过程中提交的数据同时存在于所有的同步副本上
  • 如果允许不同步的副本成为首领,则有可能丢消息,但能保证可用性(避免首领节点恢复时间长)
  • 可以设置最少同步副本数,若生产者写入数据时同步副本数不达标,会收到NotEnoughReplicasException异常,系统会变为只读状态
  • 生产者端丢消息的场景:
    1. acks设为1时,发送消息给首领,首领返回正确响应,但在给同步跟随者发送消息前首领挂了。此时跟随者的状态依然是同步状态,并会被提拔为首领,那么这条消息就不见了
    2. 即便acks设为all,当生产者发送消息给首领时,刚好碰到首领在选举,会收到一条“首领不可用”响应,若生产者不重试或正确处理,这条消息也丢了,并且跟broker没关系
    • 因此需要根据可靠性需求配置acks,并好好处理异常
  • 若要保证可靠性,最好让生产者碰到可重试错误(如首领不可用)时都采取重试处理(利用生产者的重试机制)
  • 生产者有可能因没有收到broker的确认响应消息而重试,导致重复消息。Kafka不支持Exactly-Once语义,应用程序一般会在消息中加入唯一标识符以保证不重复
  • 消费者端丢消息的场景:消费者提交了偏移量却未完成消息处理,挂掉后其它消费者接手时会忽略这些消息
  • 消费者可靠性配置:
    • group.id:每个id区分一个消费者群组,组内每个消费者读取到的是所有消息的子集。若想要消费者能消费所有消息,给它一个单独的id就行
    • auto.offset.reset:当消费者没有偏移量可提交(如第1次启动时)或请求的偏移量在broker上不存在时消费者采取的行为。earliest可能会导致重复消息,latest可能会导致丢失消息
    • enable.auto.commit:是否自动提交消息,若自动提交,有可能出现消息还没处理完就已经被提交的情况(导致丢数据),或是处理到一半就不处理也不提交了(导致重复数据)
  • 消费者遇到可重试错误时,只提交最后一个处理成功的偏移量,并把没处理好的消息保存到缓冲区里(免得被下一个轮询的数据覆盖),调用pause()方法让轮询不返回数据(以免缓冲区溢出),在保持轮询的同时尝试重试处理(不能停止轮询)。处理完后调用resume()方法继续从轮询中获取新数据
  • 遇到可重试错误时也可以将错误单独写入一个主题,并让一个消费者去专门负责重试
  • 碰到轮询期间的长时处理时,可以使用线程池来做这些处理(需要暂停数据获取,直到线程池完成处理)

第七章 构建数据管道

  • Kafka Connect
    • 为在Kafka与外部存储系统之间移动数据提供了可靠且可伸缩的方式
    • 为连接器提供了一级API与运行时(Connect负责运行连接器插件,后者负责移动数据)
    • Connect以worker进程(长时间持续作业)集群的方式运行,基于worker进程安装连接器插件。安装后可以通过REST API来管理和配置连接器
    • 连接器启动额外的task,以并行方式移动大量数据,有效利用工作节点资源
    • 数据源(source)的连接器负责从源系统读取数据,并提供给worker进程
    • 数据池(sink)的连接器负责从worker进程获取数据,并写入目标系统
    • 最好把Connect部署在独立于broker的服务器上。(所有服务器装上Kafka,其中部分服务器启动broker,剩余的启动Connect)
    • Connect进程的几个重要配置参数:
      • bootstrap.servers:列出了将要与Connect协同工作的broker服务器,连接器会向这些broker写入/读取数据,建议至少指定3个
      • group.id:相同id的worker属于同一个Connect集群,集群的连接器和它们的任务可以运行在任意一个worker上
      • key.converter和value.converter:消息的键和值使用的转换器,默认使用Kafka的JSONConverter。Connect可以处理存储在Kafka里的不同格式数据
    • 删除一个连接器会导致其他连接器重启任务,为了保持负载的均衡
  • 连接器和任务
    • 连接器插件实现了Connector API,包含两部分内容:
    1. 连接器:连接器负责三件事情:
      • 决定需要运行多少个任务
      • 按照任务来拆分数据复制
      • 从worker进程获取任务配置并将其传递下去(根据任务配置以及数据源的情况进行统计,确定任务数及每个任务负责的部分,将其分别写入单独的任务配置中返回。worker进程负责启动和配置任务)
    2. 任务
      • 负责将数据移入/移出Kafka
      • 任务在初始化时会得到由worker进程分配的源系统上下文(Source Context),其中包含了一个对象,可以将源系统记录的偏移量保存在上下文里。目标系统连接器的上下文,连接器可以用它们操作从Kafka接收的数据,如数据清理、错误重试、将偏移量保存至外部系统。初始化完成后便会依据配置启动工作。
      • 源系统任务对外部系统进行轮询,并返回记录给worker进程,worker再发送到kafka
      • 数据池任务通过worker进程接收来自kafka的记录,并将它们写入外部系统
  • worker进程
    • 是连接器和任务的容器
    • 负责处理HTTP请求(用于定义与配置连接器)
    • 负责保存连接器配置、启动连接器与连接器任务,并把配置传递给任务
    • worker进程可以随时加入、退出,都会触发连接器和任务的重分配。
    • 进程还负责提交偏移量,以便其它进程接手工作
  • 连接器和任务负责数据的移动,worker负责REST API、配置管理、可靠性、高可用、伸缩性与负载均衡等管理工作(关注点分离是优于普通客户端API的地方)
  • 转化器与Connect的数据模型
    • Connect API提供了一级数据API,其中包含了数据对象及用于描述数据的schema
    • 源连接器从源系统读取事件,并为每个事件生成schema和值(数据对象本身)
    • 目标连接器获取schema与值,用schema对值做解析,并写入目标系统
    • 源连接器、目标连接器与Kafka之间还有一个转换器,方便让数据以更合适的格式在Kafka中流动(Avro、JSON或字符串)
    • 只要有可用的转换器,连接器与数据类型可以自由组合
  • 偏移量管理
    • 因为worker进程提供偏移量管理服务,连接器可以依此通过Kafka API来维护偏移量
    • 源连接器给worker进程的数据中会包含一个逻辑分区与逻辑偏移量,表示数据在源系统中的位置。在设计源连接器时要考虑如何对源系统分区及跟踪偏移量(如JDBC可以用表作为分区,主键作为偏移量)
    • worker将记录成功写入kafka后,worker会把源系统偏移量保存下来(一般是写在一个主题下),以便崩溃后恢复状态
    • 目标连接器则是从Kafka读取包含主题、分区和偏移量的记录,然后调用连接器的put()方法,将记录保存到目标系统,并在成功后通过消费者客户端将偏移量提交到Kafka上(跟消费者行为比较像了)

第八章 跨集群数据镜像

  • 跨数据中心通信架构原则:
    • 每个数据中心至少要有一个集群
    • 每两个数据中心之间的数据复制要做到每个事件仅复制一次(除非出现错误需要重试)
    • 尽量从远程数据中心读取数据,而不是写入
  • Hub和Spoke架构
    • 一个中心Kafka集群对应多个本地Kafka集群
    • 数据只在本地数据中心生成
    • 每个数据中心的数据只会镜像到数据中心一次(单向)
    • 只处理单个数据中心数据的应用程序可以被部署在本地数据中心
    • 处理多个数据中心数据的应用程序需要被部署在中央数据中心
    • 区域数据中心无法互相访问数据
  • 双活架构
    • 每两个数据中心之间都需要进行镜像(双向)
    • 可以为就近用户提供服务保障性能,也具备完整数据的功能性
    • 可以作为冗余与弹性的容灾方案
    • 需要标记数据来源于哪个数据中心以避免循环镜像
  • 主备架构
    • 灾备集群不断从主集群中复制信息保持同步,只有主集群下线时才有上场机会
    • 主集群与备集群的偏移量不一定匹配,因为主集群写入与灾备集群镜像有时间差,也许主集群的消息已经过期了,灾备才开始镜像。即使主集群主题创建之后立即开始镜像,也有可能因为生产者进行重试而导致偏移量偏离,或是因消费者提交偏移量比提交记录早而导致偏移量偏小
    • 解决偏移量匹配问题的方法:
      • 基于时间的失效备援:消息中带时间戳,恢复处理时将失效时间往前推几分钟再从这个时间点拉取数据,会存在一些重复数据,且从时间点拉取数据的功能需要应用程序额外支持
      • 偏移量外部映射:用外部数据存储来保存集群之间的偏移量差值,在差值变化时更新数据
    • 主集群重新上线后,最好把自己的数据都删了再从灾备集群上镜像数据
    • 不建议直接将主集群地址硬编码到生产者/消费者的配置里,可以用DNS别名让其指向主集群,便随时可以指向灾备集群。或者直接用Zookeeper做集群发现。大多数情况下切换集群都需要重启消费者,以使它们找到新的偏移量来正确读取数据
  • 延展集群
    • 跨多个数据中心安装单Kafka集群
    • 直接用Kafka复制机制在broker间同步数据
    • 对带宽和延迟要求很高(除非数据中心都在一个区域)
  • MirrorMaker
    • 包含了一组消费者(出于历史原因,官方文档称之为流)及一个公共生产者(吞吐量不够就再起一个MirrorMaker进程)
    • 每个消费者被分配一个线程
    • 消费者每60秒(默认)通知生产者将数据发送到Kafka,等待确认后再通知源集群提交偏移量。即使MirrorMaker崩溃也只会有60秒重复数据
    • 尽量让MirrorMaker运行在目标数据中心里(远程读取比远程写入安全,免得丢数据)。如果需要加密传输,则可以运行在源数据中心,读取本地非加密数据,再用SSL连接发到远程数据中心

第九章管理Kafka和第十章监控Kafka暂时用不上

第十一章 流式处理

  • 数据流:无边界数据集的抽象表示
  • 事件流模型的属性:
    • 事件是有序的
    • 事件一旦发生便不能被改变
    • 事件流是可重播的(关键)
  • 时间窗口
    • 大部分针对流的操作都是基于时间窗口的,比如移动平均数、一周内销量最好的产品、系统的99百分位等
    • 两个流的合并操作也是基于时间窗口的
    • 窗口的属性:窗口大小、移动频率、可更新时间多长(Flink里的Watermark)
  • 流式处理的设计模式
    • 单个事件处理(又名map或filter)
      • 最基本的模式
      • 应用程序读取流中的事件,修改它们,把事件生成到另一个流上
      • 每一个事件都是独立处理,所以不涉及状态,易于从错误中恢复及负载均衡
    • 使用本地状态
      • 实现聚合操作需要维护流的状态
      • 需要考虑的问题:内存占用、持久化(以便重启或切换实例)、分区再均衡的状态传递
    • 多阶段处理和重分区
      • 在需要全局状态才能计算的任务(如全局TopN)中,需要将各分区的结果汇总到单分区,并在该分区上用本地状态做计算
      • 对搞Hive的人来说这个再熟悉不过了(我有点感动?)
    • 使用外部查找——流和表的连接
      • 有时需要在处理事件流中的事件时涉及外部数据读取,会带来严重延迟
      • 可以把外部数据库的信息缓存到流式处理应用程序中(那就肯定会有缓存一致性问题)
      • 可以捕捉数据库的变更事件,也形成事件流,让流式处理作业监听事件流,及时更新缓存,这个过程被称为CDC——变更数据捕捉(Change Data Capture)。Connect中的一些连接器可以执行CDC任务
    • 流与流的连接
      • 基于时间窗口的连接
      • 通过键来将流进行分区,让关联的数据分到一起(有点像Hive的Shuffle)
    • 乱序事件
      • 识别乱序事件:应用程序需要检查事件的时间,与当前时间比较
      • 规定乱序允许的时间段(Flink的Watermark)
      • 需要具备乱序消息处理能力(乱序消息与新到达消息一同处理)
      • 需要具备更新结果的能力,如果下游是数据库,可以直接put或者update
    • 重新处理
      • 两种场景下需要重新处理
      1. 对流式处理应用进行了改进,让新版本应用处理同一个流,生成新结果,并比较两种版本的结果,并让客户端从某个时间点切换到新的结果流
        • 让新版本应用作为新消费者群组
        • 让它从输入主题的第一个偏移量开始读数据(拥有自己的输入流副本)
        • 检查结果流,当新版本应用处理赶上进度时切换客户端应用程序接收的结果流
      2. 现有的流式应用有缺陷,修复后重新处理事件流并重新产生结果
        • 重置应用及本地状态
        • 让应用回到输入流的起始位置
        • 清理输出流
        • 比较麻烦
  • 每个流式应用程序至少会实现和执行一个拓扑(Topology),有点像Hive on Tez里的DAG
  • 拓扑中的节点叫处理器,大部分处理器都实现了一个数据操作——过滤、映射、聚合等
  • 流式应用的伸缩方式:主题有多少分区,就有多少任务。可以选择增加应用程序的线程数
  • Kafka会自动协调工作,为每个任务分配属于它们的分区,每个任务处理自己的分区,维护自己本地的与聚合相关的状态
  • 同一个连接操作涉及的主题必须要有相同数目的分区并基于join key进行分区,这样操作涉及的分区会全部分配给相同的任务
  • 如果要用别的join key,就需要重新分区。
  • 如果拓扑后面需要在其它键上做进一步聚合操作,Kafka Streams的处理方式是使用新的键和分区将事件写到新的主题,并启动任务从新主题读取和处理事件。这里的重新分区就是将原拓扑拆分为两个子拓扑,每个子拓扑都有自己的任务集。第二个子拓扑依赖第一个子拓扑产生的结果,但它们可以独立地运行,不用通信或共享资源
  • 选择流式框架需要考虑的问题
    • 摄取:是需要一个流式处理系统还是一个更简单专注于摄取的系统,如Kafka Connect。如果使用流式处理系统,确保它和目标系统都拥有可用的连接器
    • 延迟:如果要求毫秒级的延迟,请求与响应模式更合适,别用流了。硬要用流的话选择支持低延迟的模型,而不是基于微批次的模型
    • 异步微服务:需要支持本地存储,作为微服务数据的缓存和物化视图
    • 实时数据分析:也需要支持本地存储,为了支持高级聚合、时间窗口与连接