Skip to content

v2.4版本对接change stream架构设计

Vinllen Chen edited this page Mar 31, 2020 · 7 revisions

v2.4支持change stream对接,但只针对源端版本是大于4.0.1的版本,对于早于4.0.1的版本,不支持。

新功能简介

  1. change stream对接。可以解决sharding move chunk的问题,不关闭balancer就可以迁移sharding。
  2. 对配置文件和checkpoint添加版本号机制,如果是从2.2及以下版本升级上来的,需要重新配置,不能兼容之前版本
  3. 优化孤儿文档的处理。
  4. 在配置文件和restful接口中屏蔽密码信息。
  5. 全量增加full_sync.executor.insert_on_dup_update参数,当目的端已经存在数据,可以把insert改为update。
  6. 增加full_sync.create_index选项,目前支持none(同步结束不创建索引)和foreground(同步结束创建前台索引),后续版本可能会考虑支持background。
  7. 临时关闭2.2.1支持的增量持久化功能,将会在后续小版本进行开放,敬请关注。

change stream存在的不足:

  1. 性能来说,change stream的性能整体弱于oplog直接拉取的方式,压测结果显示平均性能差异在2-3倍。另外,MongoShake内部也在change stream和oplog中间进行转换,带来了cpu的开销。
  2. change stream目前仅支持drop database的DDL,对于create index/drop index等都不支持,后续MongoDB官方还会持续优化change stream。

架构调整

在v2.2及之前版本,MongoShake的数据流依次如下:

  1. MongoDB oplog。
  2. oplog_reader。负责从源端采用tail的方式拉取oplog,如果失败会释放迭代器并重新建立。在v2.2版本,这里加了全量期间增量持久化的功能,也就是说,全量期间拉取到的oplog将不会发送到下游组件,而是采用disqueue组件写到本地持久化文件。关于v2.2增量持久化的设计参考v2.2版本全量期间增量持久化。
  3. pending queue。队列里面存的是未解析的bson raw格式的数据。
  4. logs queue。存储已经解析成oplog的数据。
  5. batcher。负责数据聚合和分发,以及处理checkpoint。
  6. worker。发送到不同tunnel的线程。

数据流图(红色是队列,黑色是工作线程,只画出与本文有关的内容): 2.2数据流图

v2.4版本增加change stream对接的功能,如下图所示(虚线框是本次架构调整的组件)。

  1. change_stream_reader负责采用从源端采用change stream的方式接受数据
  2. 增量持久化组件后移到persist_handler,架构上更加清晰一点。为了与全量同步解耦合,顺便也为以后增量同步期间由于流量大,或者外部开关等因素开启增量备份做准备。另外也考虑到了如果还是存储在oplog_reader里面,需要在change_stream_reader也实现同样的功能,代码上存在冗余。假如没有开启增量持久化,那么将会直接把数据推到下游的pending queue。
  3. 下游的pending queue里面原来存储的是bons.Raw格式的未解析的数据,现在改为[]byte数组,这是为了增量持久化组件后移处理对齐(存储到磁盘上的都是裸的[]byte)。对上游来说,oplog_reader和change_stream_reader采用的是不同的driver,解析后的bson.Raw无法直接对齐,改为[]byte通用格式更加合适,且不存在解析开销。
  4. 数据从pending queue流出后,如果不是change stream方式是直接解析成oplog写入到logs queue。
  5. Translate queue是用于change stream event翻译成Oplog用的。如果源端不是通过change stream拉取会直接写入到Logs queue。
  6. pending_queue:translate_queue:logs_queue = 1:1:1,继续沿用之前并行解析保证同步速率。

2.4数据流图

架构带来的风险评估:

  1. 内存使用上涨。由于多引入了一个translate queue,内存会增大,但是这个queue size不会很大,所以带来的内存可控。
  2. persist_handler对增量持久化,涉及到磁盘的读/写,性能上会降低。考虑到这个启用的时候要么是在全量阶段,要么是在增量的流量高峰期(目前不会有这种情况),带来的性能影响较低。
  3. persist_handler持久化后,重启导致的数据丢失。persist_handler用的是disk queue的开源组件,该组件在数据读取后将会删除,可能会导致重启数据的丢失。v2.2版本的处理是每读一批数据,就更新一下checkpoint,这种方式会降低数据丢失的概率,但是问题依然存在(在读取完,数据没有发送到目的端的过程中发生中断),而且这种每次都强刷checkpoint并等待的策略,性能比较低下。所以,后面还会对这个开源组件代码进行调整,读取完不删除,外部触发删除的逻辑。

checkpoint

由于change stream是采用resumeToken进行断点续传的,而原来v2.2以前的版本是采用ts进行断点续传的,那么对于change stream对接后,到底checkpoint是采用resumeToken还是ts方式呢?以下是2种方式的优缺点。

1. resumeToken

  1. 不需要对事务进行合并成一个oplog。因为resumeToken自带applyOpsIndex标记当时是applyOps数组里面的第几个元素,那么就可以做断点续传。
  2. checkpoint的CRUD逻辑需要修改
  3. 不支持rpc/tcp通道,但支持direct/kafka通道。这是因为rpc和tcp是采用异步确认的方式,worker发送后,对端异步写入,然后回复一批oplog的最新的ts表示该ts之前的数据都已经成功写入了,而这个ts也就是checkpoint,如果采用resumeToken,那么这个信息并没有携带在发送的oplog里面,需要在头部字段修改,此处有修改的成本。

2. ts

  1. 需要对收到的一批事务合并成一个applyOps的oplog。因为这一批事务的ts都是一样的,比如1,2,3这三条都是t1时刻,如果1写成功了,checkpoint可能更新到t1,然后这时候MongoShake挂了,那么启动后下次将从t1之后开始拉取,2,3就被跳过了从而造成数据丢失。
  2. checkpoint的CRUD逻辑不需要修改。
  3. 支持tcp/rpc/kafka/direct。
  4. 断点续传的时候,需要根据ts构建resumeToken,同理也需要根据resumeToken解析出ts进行checkpoint持久化。这里需要看下MongoDB本身对于resumeToken的封装/解封装逻辑,依照进行重新实现。这里即使checkpoint是resumeToken,“根据ts构建resumeToken”理论上也是要做的,支持用户指定ts进行同步,也支持全量结束后,增量根据给定的位点开始拉取;“根据resumeToken解析出ts”理论上也是要做的,监控打印同步的位点信息。“根据ts构建resumeToken”在4.0里面提供了startAtOperationTime参数,可以直接作为起始的位点。“根据resumeToken解析出ts”这部分也不需要做,因为拉取的event里面有clusterTime字段,可以直接解析出timestamp字段。

综合考虑,选型采用第二种ts的方式继续作为checkpoint。

附录

ResumeToken结构:

struct ResumeTokenData {
    Timestamp clusterTime;
    int version = 0;
    size_t applyOpsIndex = 0;
    Value documentKey;
    boost::optional<UUID> uuid;
};

参考:

Clone this wiki locally