Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Raft algorithm to support share witness #168

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
rn.raft.becomeFollower(1, None)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
var cct pb.ConfChangeType
if peer.IsWitness {
cct = pb.ConfChangeAddWitness
} else {
cct = pb.ConfChangeAddNode
}
cc := pb.ConfChange{Type: cct, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
return err
Expand All @@ -74,7 +80,13 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
var cct pb.ConfChangeType
if peer.IsWitness {
cct = pb.ConfChangeAddWitness
} else {
cct = pb.ConfChangeAddNode
}
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: cct}.AsV2())
}
return nil
}
86 changes: 73 additions & 13 deletions confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker
for id := range incoming(cfg.Voters) {
outgoing(cfg.Voters)[id] = struct{}{}
}
cfg.Witnesses[1] = cfg.Witnesses[0]

if err := c.apply(&cfg, trk, ccs...); err != nil {
return c.err(err)
Expand Down Expand Up @@ -109,12 +110,14 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
for id := range outgoing(cfg.Voters) {
_, isVoter := incoming(cfg.Voters)[id]
_, isLearner := cfg.Learners[id]
isWitness := cfg.Witnesses[0] == id

if !isVoter && !isLearner {
if !isVoter && !isLearner && !isWitness {
delete(trk, id)
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.Witnesses[1] = 0
cfg.AutoLeave = false

return checkAndReturn(cfg, trk)
Expand Down Expand Up @@ -155,17 +158,25 @@ func (c Changer) apply(cfg *tracker.Config, trk tracker.ProgressMap, ccs ...pb.C
// here to ignore these.
continue
}

var err error
switch cc.Type {
case pb.ConfChangeAddNode:
c.makeVoter(cfg, trk, cc.NodeID)
err = c.makeVoter(cfg, trk, cc.NodeID)
case pb.ConfChangeAddLearnerNode:
c.makeLearner(cfg, trk, cc.NodeID)
err = c.makeLearner(cfg, trk, cc.NodeID)
case pb.ConfChangeRemoveNode:
c.remove(cfg, trk, cc.NodeID)
case pb.ConfChangeUpdateNode:
case pb.ConfChangeAddWitness:
err = c.makeWitness(cfg, trk, cc.NodeID)
default:
return fmt.Errorf("unexpected conf type %d", cc.Type)
}

if err != nil {
return err
}
}
if len(incoming(cfg.Voters)) == 0 {
return errors.New("removed all voters")
Expand All @@ -175,17 +186,23 @@ func (c Changer) apply(cfg *tracker.Config, trk tracker.ProgressMap, ccs ...pb.C

// makeVoter adds or promotes the given ID to be a voter in the incoming
// majority config.
func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) {
func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) error {
pr := trk[id]
if pr == nil {
c.initProgress(cfg, trk, id, false /* isLearner */)
return
c.initProgress(cfg, trk, id, false /* isLearner */, false /* isWitness */)
return nil
}

if pr.IsWitness {
return fmt.Errorf("cannot change witness to non-witness voter")
}

pr.IsLearner = false
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)
incoming(cfg.Voters)[id] = struct{}{}

return nil
}

// makeLearner makes the given ID a learner or stages it to be a learner once
Expand All @@ -201,14 +218,17 @@ func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id uint
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
// be added to Learners the moment the outgoing config is removed by
// LeaveJoint().
func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) {
func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) error {
pr := trk[id]
if pr == nil {
c.initProgress(cfg, trk, id, true /* isLearner */)
return
c.initProgress(cfg, trk, id, true /* isLearner */, false /* isWitness */)
return nil
}
if pr.IsLearner {
return
return nil
}
if pr.IsWitness {
return fmt.Errorf("cannot change witness to non-witness learner")
}
// Remove any existing voter in the incoming config...
c.remove(cfg, trk, id)
Expand All @@ -223,11 +243,35 @@ func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id ui
nilAwareAdd(&cfg.LearnersNext, id)
} else {
pr.IsLearner = true
pr.IsWitness = false
nilAwareAdd(&cfg.Learners, id)
}

return nil
}

// remove this peer as a voter or learner from the incoming config.
// makeWitness adds the given ID to be a witness in the incoming
// majority config.
func (c Changer) makeWitness(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) error {
if cfg.Witnesses[0] > 0 && cfg.Witnesses[0] != id {
return fmt.Errorf("cannot have more than one witness in incoming")
}

// now either incoming does not have any witness or it already has this witness
pr := trk[id]
if pr == nil {
c.initProgress(cfg, trk, id, false /* isLearner */, true /* isWitness */)
return nil
}

if !pr.IsWitness {
return fmt.Errorf("cannot change non-witness voter/learner to witness")
}

return nil
}

// remove this peer as a voter or learner or witness from the incoming config.
func (c Changer) remove(cfg *tracker.Config, trk tracker.ProgressMap, id uint64) {
if _, ok := trk[id]; !ok {
return
Expand All @@ -236,17 +280,23 @@ func (c Changer) remove(cfg *tracker.Config, trk tracker.ProgressMap, id uint64)
delete(incoming(cfg.Voters), id)
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)
if id == cfg.Witnesses[0] {
cfg.Witnesses[0] = 0
}

// If the peer is still a voter in the outgoing config, keep the Progress.
if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
delete(trk, id)
}
}

// initProgress initializes a new progress for the given node or learner.
func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id uint64, isLearner bool) {
// initProgress initializes a new progress for the given node or learner or witness.
func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id uint64, isLearner bool, isWitness bool) {
if !isLearner {
incoming(cfg.Voters)[id] = struct{}{}
if isWitness {
cfg.Witnesses[0] = id
}
} else {
nilAwareAdd(&cfg.Learners, id)
}
Expand All @@ -263,6 +313,7 @@ func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id u
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
IsLearner: isLearner,
IsWitness: isWitness,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
Expand Down Expand Up @@ -328,6 +379,15 @@ func checkInvariants(cfg tracker.Config, trk tracker.ProgressMap) error {
}
}

for id, voters := range cfg.Voters {
w := cfg.Witnesses[id]
if w > 0 {
if _, ok := voters[w]; !ok {
return fmt.Errorf("%d is in Witnesses[%d] but not in Voters[%d]", w, id, id)
}
}
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion confchange/quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (confChanges) Generate(rand *rand.Rand, _ int) reflect.Value {
return 1 + uint64(num())
}
typ := func() pb.ConfChangeType {
return pb.ConfChangeType(rand.Intn(len(pb.ConfChangeType_name)))
// exclude witness from the test case
return pb.ConfChangeType(rand.Intn(len(pb.ConfChangeType_name) - 1))
}
return reflect.ValueOf(genCC(num, id, typ))
}
Expand Down
33 changes: 24 additions & 9 deletions confchange/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.Con
// quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4)
//
// as desired.

for _, id := range cs.VotersOutgoing {
// If there are outgoing voters, first add them one by one so that the
// (non-joint) config has them all.
out = append(out, pb.ConfChangeSingle{
Type: pb.ConfChangeAddNode,
NodeID: id,
})
if id == cs.WitnessOutgoing {
out = append(out, pb.ConfChangeSingle{
Type: pb.ConfChangeAddWitness,
NodeID: cs.WitnessOutgoing,
})
} else {
out = append(out, pb.ConfChangeSingle{
Type: pb.ConfChangeAddNode,
NodeID: id,
})
}

}

Expand All @@ -72,12 +78,20 @@ func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.Con
NodeID: id,
})
}

// Then we'll add the incoming voters and learners.
for _, id := range cs.Voters {
in = append(in, pb.ConfChangeSingle{
Type: pb.ConfChangeAddNode,
NodeID: id,
})
if id == cs.Witness {
in = append(in, pb.ConfChangeSingle{
Type: pb.ConfChangeAddWitness,
NodeID: cs.Witness,
})
} else {
in = append(in, pb.ConfChangeSingle{
Type: pb.ConfChangeAddNode,
NodeID: id,
})
}
}
for _, id := range cs.Learners {
in = append(in, pb.ConfChangeSingle{
Expand All @@ -93,6 +107,7 @@ func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.Con
NodeID: id,
})
}

return out, in
}

Expand Down
12 changes: 12 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,18 @@ func (l *raftLog) firstIndex() uint64 {
return index
}

func (l *raftLog) entry(i uint64) *pb.Entry {
ents, err := l.entries(i, noLimit)
if err != nil {
panic(err)
}
return &ents[0]
}
func (l *raftLog) lastEntry() *pb.Entry {
idx := l.lastIndex()
return l.entry(idx)
}

func (l *raftLog) lastIndex() uint64 {
if i, ok := l.unstable.maybeLastIndex(); ok {
return i
Expand Down
Loading