Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete snapshot implementation #275

Closed
TheR1sing3un opened this issue Feb 17, 2023 · 1 comment · Fixed by #298
Closed

Complete snapshot implementation #275

TheR1sing3un opened this issue Feb 17, 2023 · 1 comment · Fixed by #298

Comments

@TheR1sing3un
Copy link
Contributor

现状

目前已经实现了初步的快照,但是快照仅用于启动时的快速replay到状态机。目前会有如下的问题:
当leader给follower进行日志append时,目标日志已经因为快照生成而被删除,导致无法找到该日志,leader一直报错,并且follower也一直无法同步到这条日志,如果当前raft peers中过半follower都出现上述问题,那么整个集群将处于不可用的状态。

解决

我们需要实现完整的RAFT快照协议,也就是目前需要实现当follower需要同步leader已经被快照删除的日志的时候,leader需要直接发送当前最新的快照到follower,用于follower的快速同步。

论文解析

我们的日志肯定是不可以持续的增长下去的,因为当我们日志数量达到很大的时候,比如说我们的日志数据已经达到了几千万条的时候,我们和一个还没有多少数据的跟随者进行同步的话,需要将这些日志全部发送,其实是十分浪费资源和时间的。

那么我们其实可以使用快照,也就是对领袖某一个时刻它的状态机的数据进行保存,然后将这个快照发送给那些很落后的节点进行快速的同步,同时由于快照已经记录此时的所有必要数据,那么我们可以将这些日志删除,避免日志无限度的增长下去。

论文中的Figure 13是安装快照的RPC的参数和实现。

安装快照RPC

由领袖调用,用于发送一个快照的分块给跟随者。领袖领袖按照顺序发送分块

参数:

term 领袖的任期
leaderId 领袖的id,便于跟随者用于重定向客户端的请求
lastIncludedIndex 快照取代的所有的日志中最后一个日志的索引
lastIncludedTerm lastIncludedIndex处的日志的任期
offset 该分块在快照文件中的字节偏移量
data[] 从offset开始的分块的纯字节数据
done 如果是最后一个分块则为true

结果:

term 服务器的currentTerm,用于领袖更新自己的任期

接收者实现:

  1. 如果term < currentTerm则立马回复。
  2. 如果是第一个分块则创建一个新的快照文件。(offset*为0)
  3. 在给定的offset处开始写入数据。
  4. 如果done不为true,那么回复然后等待更多的数据分块传来。
  5. 保存快照文件,丢弃任何比lastIncludedIndex小的快照或者部分快照。
  6. 如果存在一个日志和快照最后包含的日志有着一样的索引和任期,那么保留这个日志以及其以后的日志,并回复。
  7. 丢弃所有日志。
  8. 使用快照的内容重置状态机。(以及加载快照的集群配置)

实现快照

快照生成

  1. DLedgerEntryPusher检测到当前某index可以commit,则调用StateMachineCaller的onCommit进行提交。
  2. StateMachineCaller等待该commit任务从任务队列中取出,然后开始执行doCommit方法。
  3. 调用StateMachine的onCommit用于在状态机中应用目前被提交但未被apply的日志。
  4. 调用SnapshotManager的saveSnapshot方法用于判断当前是否需要进行快照,以及后续的快照操作。
  5. 如果当前符合快照触发条件,那么调用SnapshotStore的createSnapshotWritter用于生成一个快照文件的writer。
  6. 生成一个钩子函数SnapshotSaveHook用于保存基本的快照元数据信息和writer对象,以及用于后续回调操作。
  7. 调用StateMachineCaller的onSnapshotSave将该快照保存任务放入任务队列。
  8. 当任务队列执行到该任务时,调用doSnapshotSave方法。
  9. 调用StateMachine的onSnapshotSave用于让状态机将自身状态生成一个快照。
  10. 将快照数据写入到SnapshotStore。
  11. StateMachineCaller在状态机执行完快照保存操作后,根据实际结果进行回调给SnapshotManager。
  12. 如果写入成功,则将DLedgerStore中被快照覆盖的数据进行reset,也就是删除。

快照加载

  1. DLedgerServer启动时,需要先尝试从快照中进行快速重放,也就是调用SnapshotManager的loadSnapshot方法。
  2. SnapshotManager尝试进行快照读取流程,先从SnapshotStore中创建一个snapshotReader用于从快照存储空间中读取快照元数据和实际数据。
  3. 生成一个snapshotLoadHook钩子函数,推进实际的快照读取任务以及读取之后的回调。
  4. 调用StateMachineCaller的onSnapshotLoad方法,生成一个快照读取任务,然后放入到任务队列。
  5. 当任务队列执行到该任务时,调用doSnapshotLoad方法用于实际的快照读取。
  6. snapshotReader中读取SnapshotStore中的该快照的元数据信息,判断该快照目前是否有效。
  7. 快照若有效,则调用StateMachine的onSnapshootLoad
  8. StateMachine从snapshotReader中读取SnapshotStore中的实际快照数据,然后更新自己的状态机。
  9. 根据快照读取结果,StateMachineCaller调用snapshotLoadHook的回调。
  10. 当正确应用了快照之后,需要更新DLedgerStore中的index等数据,也就是起始的log的索引从lasIncluedIndex+1开始。

快照安装

  1. 当leader节点的EntryDispatcher需要发送的日志已经因为快照被删除的时候,那么对目标follower发起一个InstallSnapshot的RPC请求,将从本地的SnapshotManager获取一个可用的快照数据,然后通过上述请求携带发送。
  2. follower节点的EntryHandler接收到该InstallSnapshot的请求,先进行一次有效判断,即判断leader身份和快照是否当前仍有效。
  3. 调用SnapshotManager的installSnapshot方法,发起一次快照安装。
  4. 先将快照的数据写入到SnapshotStore中一个临时目录下。
  5. 获取该快照数据的snapshotReader
  6. 生成一个Install类型的snapshotLoadHook,这里和普通的快照加载中的hook进行区分,因为读取后的回调函数逻辑不同。
  7. 调用StateMachineCaller的onSnapshotLoad方法将该任务入列。
  8. 该任务被执行到的时候调用doSnapshotLoad方法。
  9. 使用snapshotReader从SnapshotStore中读取元数据信息,判断该快照目前是否有效。
  10. 快照若有效,则调用StateMachine的onSnapshotLoad方法。
  11. 从SnapshotStore中读取快照数据,更新自己的状态机。
  12. 根据快照读取结果,StateMachineCaller调用类型为InstallsnapshotLoadHook回调函数。
  13. 此时若正确在状态机中加载了该快照,那么需要将快照目录从临时目录移动到正式目录,然后将lastIncludedIndex前的日志都清空,并且更新Raft的commitIndex

优化

快照发送

目前我们先实现直接通过一次request来发送所有的快照数据,但是实际生产环境下的快照数据都不会很小,一次请求就直接发送全部的数据不太现实,因此可以这里进行分chunk发送。

TheR1sing3un added a commit to TheR1sing3un/dledger that referenced this issue Feb 19, 2023
1. support protocol about install snapshot

Closes openmessaging#275
@tsunghanjacktsai
Copy link
Contributor

tsunghanjacktsai commented Feb 22, 2023

@TheR1sing3un Great idea. Please add further info on how you would implement the snapshot installation based on the architecture of DLedger if you have time.

TheR1sing3un added a commit to TheR1sing3un/dledger that referenced this issue Jun 4, 2023
1. support protocol about install snapshot

Closes openmessaging#275
TheR1sing3un added a commit to TheR1sing3un/dledger that referenced this issue Jun 4, 2023
1. support protocol about install snapshot

Closes openmessaging#275
TheR1sing3un added a commit to TheR1sing3un/dledger that referenced this issue Jul 9, 2023
1. support protocol about install snapshot

Closes openmessaging#275
TheR1sing3un added a commit to TheR1sing3un/dledger that referenced this issue Jul 9, 2023
1. support protocol about install snapshot

Closes openmessaging#275
RongtongJin pushed a commit that referenced this issue Jul 15, 2023
* feat(core): support protocol about install snapshot

1. support protocol about install snapshot

Closes #275

* feat(core): support protocol about install snapshot

1. support protocol about install snapshot

Closes #275

* feat(core): refactor basic dledger overall structure to make it more "raft" like

1. refactor basic dledger overall structure to make it more "raft" like

* feat(core): pass all original test

1. pass all original test

* feat(core): support batch append

1. support batch append

* fix(example): resolve conflicts after rebasing master

1. resolve conflicts after rebasing master

* fix(jepsen): resolve conflicts about jepsen after rebasing master

1. resolve conflicts about jepsen after rebasing master

* fix(jepsen): fix type error

1. fix type error

* feat(core): support installing snapshot

1. support installing snapshot

* feat(core): support installing snapshot

1. support installing snapshot

* feat(jepsen): test snapshot in jepsen

1. test snapshot in jepsen

* test(core): polish flaky test

1. polish flaky test

* rerun

* feat(core): commit entry which is in current term

1. commit entry which is in current term

* rerun

* rerun

* rerun

* fix(core): make the field: position in DLedgerEntry meaningless

1. make the field: position in DLedgerEntry meaningless

* test(core): use different store base path for each ut

1. use different store base path for each ut

* test(core): use different store base path for each ut

1. use different store base path for each ut

* rerun

* test(core): use different store base path for each ut

1. use different store base path for each ut

* test(core): use different store base path for each ut

1. use different store base path for each ut

* test(core): use different store base path for each ut

1. use different store base path for each ut

* fix(core): update peer watermark when compare success

1. update peer watermark when compare success

* fix(core): fix

1. fix

* fix(core): fix

1. fix

* test(proxy): remove proxy test

1. remove proxy test

* feat(example): add batch append cmd

1. add batch append cmd

* fix(core): reuse forks

1. reuse forks

* chore(global): add more git ignore file

1. add more git ignore file

* build(global): set reuseForks to false

1.  set reuseForks to false

* rerun

* feat(core): clear pending map and set writeIndex when role dispatcher role change from append to compare

1. clear pending map and set writeIndex when role dispatcher role change
from append to compare

* rerun
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants