From 42fdb852de3f4c67d83ba76b0cda837729bb32c9 Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 27 Dec 2022 09:23:25 +0800 Subject: [PATCH] feat(store): raft support commit callback Signed-off-by: James Yin --- internal/store/block/block.go | 4 +- internal/store/block/raft/appender.go | 184 +++++------------------- internal/store/segment/replica.go | 4 +- internal/store/segment/server.go | 27 +++- raft/inflight.go | 67 +++++++++ raft/log.go | 14 +- raft/log_unstable.go | 6 +- raft/node.go | 102 +++++-------- raft/propose.go | 103 +++++++++++++ raft/raft.go | 200 ++++++++++++++++---------- 10 files changed, 417 insertions(+), 294 deletions(-) create mode 100644 raft/inflight.go create mode 100644 raft/propose.go diff --git a/internal/store/block/block.go b/internal/store/block/block.go index b3cf909d4..900e601ae 100644 --- a/internal/store/block/block.go +++ b/internal/store/block/block.go @@ -44,8 +44,10 @@ type Reader interface { Read(ctx context.Context, seq int64, num int) ([]Entry, error) } +type AppendCallback = func(seqs []int64, err error) + type Appender interface { - Append(ctx context.Context, entries ...Entry) ([]int64, error) + Append(ctx context.Context, entries []Entry, cb AppendCallback) } type Block interface { diff --git a/internal/store/block/raft/appender.go b/internal/store/block/raft/appender.go index 0d7451623..11d124312 100644 --- a/internal/store/block/raft/appender.go +++ b/internal/store/block/raft/appender.go @@ -24,7 +24,6 @@ import ( "time" // third-party libraries. - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" // first-party libraries. @@ -67,11 +66,6 @@ type peer struct { type LeaderChangedListener func(block, leader vanus.ID, term uint64) -type commitWaiter struct { - offset int64 - c chan struct{} -} - type Appender interface { block.Appender @@ -86,11 +80,6 @@ type appender struct { actx block.AppendContext appendMu sync.RWMutex - waiters []commitWaiter - commitIndex uint64 - commitOffset int64 - waitMu sync.Mutex - leaderID vanus.ID listener LeaderChangedListener @@ -116,7 +105,6 @@ func NewAppender( a := &appender{ raw: raw, - waiters: make([]commitWaiter, 0), listener: listener, log: raftLog, host: host, @@ -126,7 +114,6 @@ func NewAppender( tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal), } a.actx = a.raw.NewAppendContext(nil) - a.commitOffset = a.actx.WriteOffset() a.log.SetSnapshotOperator(a) a.host.Register(a.ID().Uint64(), a) @@ -145,9 +132,6 @@ func NewAppender( } a.node = raft.RestartNode(c) - // Access Commit after raft.RestartNode to ensure raft state is initialized. - a.commitIndex = a.log.HardState().Commit - go a.run(ctx) return a @@ -207,13 +191,6 @@ func (a *appender) run(ctx context.Context) { case rd := <-a.node.Ready(): rCtx, span := a.tracer.Start(ctx, "RaftReady", trace.WithNewRoot()) - var partial bool - stateChanged := !raft.IsEmptyHardState(rd.HardState) - if stateChanged { - // Wake up fast before writing logs. - partial = a.wakeup(rCtx, rd.HardState.Commit) - } - if len(rd.Entries) != 0 { log.Debug(rCtx, "Append entries to raft log.", map[string]interface{}{ "node_id": a.ID(), @@ -238,11 +215,7 @@ func (a *appender) run(ctx context.Context) { }) } - if stateChanged { - // Wake up after writing logs. - if partial { - _ = a.wakeup(rCtx, rd.HardState.Commit) - } + if !raft.IsEmptyHardState(rd.HardState) { log.Debug(rCtx, "Persist raft hard state.", map[string]interface{}{ "node_id": a.ID(), "hard_state": rd.HardState, @@ -336,40 +309,6 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E return committedEntries[num-1].Index } -// wakeup wakes up append requests to the smaller of the committed or last index. -func (a *appender) wakeup(ctx context.Context, commit uint64) (partial bool) { - _, span := a.tracer.Start(ctx, "wakeup", trace.WithAttributes( - attribute.Int64("commit", int64(commit)))) - defer span.End() - - li, _ := a.log.LastIndex() - if commit > li { - commit = li - partial = true - } - - if commit <= a.commitIndex { - return - } - a.commitIndex = commit - - for off := commit; off > 0; off-- { - pbEntries, err := a.log.Entries(off, off+1, 0) - if err != nil { - return - } - - pbEntry := pbEntries[0] - if pbEntry.Type == raftpb.EntryNormal && len(pbEntry.Data) > 0 { - frag := block.NewFragment(pbEntry.Data) - a.doWakeup(ctx, frag.EndOffset()) - return - } - } - - return partial -} - func (a *appender) becomeLeader(ctx context.Context) { ctx, span := a.tracer.Start(ctx, "becomeLeader") defer span.End() @@ -459,65 +398,65 @@ func (a *appender) reset(ctx context.Context) { } // Append implements block.raw. -func (a *appender) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) { +func (a *appender) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) { ctx, span := a.tracer.Start(ctx, "Append") defer span.End() - seqs, offset, err := a.append(ctx, entries) - if err != nil { - if errors.Is(err, errors.ErrSegmentFull) { - _ = a.waitCommit(ctx, offset) - } - return nil, err - } - - // Wait until entries is committed. - err = a.waitCommit(ctx, offset) - if err != nil { - return nil, err - } - - return seqs, nil -} - -func (a *appender) append(ctx context.Context, entries []block.Entry) ([]int64, int64, error) { - ctx, span := a.tracer.Start(ctx, "append") - defer span.End() - span.AddEvent("Acquiring append lock") a.appendMu.Lock() span.AddEvent("Got append lock") - defer a.appendMu.Unlock() - if !a.isLeader() { - return nil, 0, errors.ErrNotLeader + a.appendMu.Unlock() + cb(nil, errors.ErrNotLeader) + return } if a.actx.Archived() { - return nil, a.actx.WriteOffset(), errors.ErrSegmentFull + a.appendMu.Unlock() + cb(nil, errors.ErrSegmentFull) + return } seqs, frag, enough, err := a.raw.PrepareAppend(ctx, a.actx, entries...) if err != nil { - return nil, 0, err + a.appendMu.Unlock() + cb(nil, err) + return } - off := a.actx.WriteOffset() data, _ := block.MarshalFragment(ctx, frag) - if err = a.node.Propose(ctx, data); err != nil { - return nil, 0, err - } + var pds []raft.ProposeData if enough { - if frag, err = a.raw.PrepareArchive(ctx, a.actx); err == nil { - data, _ := block.MarshalFragment(ctx, frag) - _ = a.node.Propose(ctx, data) + if frag, err := a.raw.PrepareArchive(ctx, a.actx); err == nil { + archivedData, _ := block.MarshalFragment(ctx, frag) + pds = make([]raft.ProposeData, 2) // FIXME(james.yin): revert archived if propose failed. + pds[1] = raft.ProposeData{ + Data: archivedData, + } + } else { + pds = make([]raft.ProposeData, 1) } + } else { + pds = make([]raft.ProposeData, 1) } - return seqs, off, nil + pds[0] = raft.ProposeData{ + Data: data, + Callback: func(err error) { + if err != nil { + cb(nil, err) + } else { + cb(seqs, nil) + } + }, + } + + a.node.Propose(ctx, pds...) + + a.appendMu.Unlock() } func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) { @@ -527,57 +466,6 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) { _, _ = a.raw.CommitAppend(ctx, frags...) } -func (a *appender) waitCommit(ctx context.Context, offset int64) error { - ctx, span := a.tracer.Start(ctx, "waitCommit") - defer span.End() - - span.AddEvent("Acquiring wait lock") - a.waitMu.Lock() - span.AddEvent("Got wait lock") - - if offset <= a.commitOffset { - a.waitMu.Unlock() - return nil - } - - ch := make(chan struct{}) - a.waiters = append(a.waiters, commitWaiter{ - offset: offset, - c: ch, - }) - - a.waitMu.Unlock() - - // FIXME(james.yin): lost leader - select { - case <-ch: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (a *appender) doWakeup(ctx context.Context, commit int64) { - _, span := a.tracer.Start(ctx, "doWakeup") - defer span.End() - - span.AddEvent("Acquiring wait lock") - a.waitMu.Lock() - span.AddEvent("Got wait lock") - - defer a.waitMu.Unlock() - - for len(a.waiters) != 0 { - waiter := a.waiters[0] - if waiter.offset > commit { - break - } - close(waiter.c) - a.waiters = a.waiters[1:] - } - a.commitOffset = commit -} - func (a *appender) Status() ClusterStatus { leader, term := a.leaderInfo() return ClusterStatus{ diff --git a/internal/store/segment/replica.go b/internal/store/segment/replica.go index f950565a6..b1649a029 100644 --- a/internal/store/segment/replica.go +++ b/internal/store/segment/replica.go @@ -80,8 +80,8 @@ func (r *replica) Read(ctx context.Context, seq int64, num int) ([]block.Entry, return r.raw.Read(ctx, seq, num) } -func (r *replica) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) { - return r.appender.Append(ctx, entries...) +func (r *replica) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) { + r.appender.Append(ctx, entries, cb) } func (r *replica) Status() *metapb.SegmentHealthInfo { diff --git a/internal/store/segment/server.go b/internal/store/segment/server.go index 4017293d4..475f6f2fb 100644 --- a/internal/store/segment/server.go +++ b/internal/store/segment/server.go @@ -134,6 +134,29 @@ type leaderInfo struct { term uint64 } +type appendResult struct { + seqs []int64 + err error +} + +type appendFuture chan appendResult + +func newAppendFuture() appendFuture { + return make(appendFuture, 1) +} + +func (af appendFuture) onAppended(seqs []int64, err error) { + af <- appendResult{ + seqs: seqs, + err: err, + } +} + +func (af appendFuture) wait() ([]int64, error) { + res := <-af + return res.seqs, res.err +} + type server struct { replicas sync.Map // vanus.ID, Replica @@ -667,7 +690,9 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb. metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(len(events))) metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size)) - seqs, err := b.Append(ctx, entries...) + future := newAppendFuture() + b.Append(ctx, entries, future.onAppended) + seqs, err := future.wait() if err != nil { return nil, s.processAppendError(ctx, b, err) } diff --git a/raft/inflight.go b/raft/inflight.go new file mode 100644 index 000000000..9f590ba92 --- /dev/null +++ b/raft/inflight.go @@ -0,0 +1,67 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import "sort" + +type waiter struct { + index uint64 + cb ProposeCallback +} + +type inflight struct { + waiters []waiter +} + +func (in *inflight) append(index uint64, cb ProposeCallback) { + in.waiters = append(in.waiters, waiter{ + index: index, + cb: cb, + }) +} + +func (in *inflight) commitTo(index uint64) { + size := len(in.waiters) + if size == 0 { + return + } + n := sort.Search(size, func(i int) bool { + return in.waiters[i].index > index + }) + committed := in.waiters[:n] + in.waiters = in.waiters[n:] + // TODO(james.yin): invoke callbacks in other goroutine + for _, w := range committed { + w.cb(nil) + } +} + +func (in *inflight) truncateFrom(index uint64) { + size := len(in.waiters) + if size == 0 { + return + } + n := sort.Search(size, func(i int) bool { + return in.waiters[i].index >= index + }) + if n < size { + dropped := append([]waiter{}, in.waiters[n:]...) + in.waiters = in.waiters[:n] + // TODO(james.yin): invoke callbacks in other goroutine + for _, w := range dropped { + w.cb(ErrProposalDropped) + } + } +} diff --git a/raft/log.go b/raft/log.go index 08a0bc6ee..c7ad8dc74 100644 --- a/raft/log.go +++ b/raft/log.go @@ -25,6 +25,8 @@ type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage + inflight *inflight + // unstable contains all unstable entries and snapshot. // they will be saved into storage. unstable unstable @@ -125,12 +127,17 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { if after := ents[0].Index - 1; after < l.committed { l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } - l.unstable.truncateAndAppend(ents) + + li, truncated := l.unstable.truncateAndAppend(ents) + start := ents[0].Index + if truncated { + l.inflight.truncateFrom(start) + } // Reset pending when any entry being persisted is truncated. - if start := ents[0].Index; l.pending > start { + if l.pending > start { l.pending = start } - return l.lastIndex() + return li } // findConflict finds the index of the conflict. @@ -295,6 +302,7 @@ func (l *raftLog) localCommitTo(tocommit uint64) { // l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, li) // } l.localCommitted = tocommit + l.inflight.commitTo(tocommit) } } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index d15aa1b47..93612421f 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -122,7 +122,7 @@ func (u *unstable) restore(s pb.Snapshot) { u.snapshot = &s } -func (u *unstable) truncateAndAppend(ents []pb.Entry) { +func (u *unstable) truncateAndAppend(ents []pb.Entry) (li uint64, truncated bool) { after := ents[0].Index switch { case after == u.offset+uint64(len(u.entries)): @@ -135,13 +135,17 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // portion, so set the offset and replace the entries u.offset = after u.entries = ents + truncated = true default: // truncate to after and copy to u.entries // then append u.logger.Infof("truncate the unstable entries before index %d", after) u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...) u.entries = append(u.entries, ents...) + truncated = true } + li = ents[len(ents)-1].Index + return } func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { diff --git a/raft/node.go b/raft/node.go index e4910c9e9..cf13a5e07 100644 --- a/raft/node.go +++ b/raft/node.go @@ -139,9 +139,8 @@ type Node interface { Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error - // Propose proposes that data be appended to the log. Note that proposals can be lost without - // notice, therefore it is user's job to ensure proposal retries. - Propose(ctx context.Context, data []byte) error + // Propose proposes that data be appended to the log. + Propose(ctx context.Context, pds ...ProposeData) // ProposeConfChange proposes a configuration change. Like any proposal, the // configuration change may be dropped with or without an error being // returned. In particular, configuration changes are dropped unless the @@ -259,11 +258,6 @@ func RestartNode(c *Config) Node { return &n } -type msgWithResult struct { - m pb.Message - result chan error -} - type peersWithResult struct { peers []Peer result chan error @@ -271,7 +265,7 @@ type peersWithResult struct { // node is the canonical implementation of the Node interface type node struct { - propc chan msgWithResult + propc chan []ProposeData recvc chan pb.Message confc chan pb.ConfChangeV2 confstatec chan pb.ConfState @@ -290,7 +284,7 @@ type node struct { func newNode(rn *RawNode) node { return node{ - propc: make(chan msgWithResult), + propc: make(chan []ProposeData), recvc: make(chan pb.Message), confc: make(chan pb.ConfChangeV2), confstatec: make(chan pb.ConfState), @@ -330,7 +324,7 @@ func (n *node) Bootstrap(peers []Peer) error { } select { case ch <- bp: - //case <-ctx.Done(): + // case <-ctx.Done(): // return ctx.Err() case <-n.done: return ErrStopped @@ -340,7 +334,7 @@ func (n *node) Bootstrap(peers []Peer) error { if err != nil { return err } - //case <-ctx.Done(): + // case <-ctx.Done(): // return ctx.Err() case <-n.done: return ErrStopped @@ -349,7 +343,7 @@ func (n *node) Bootstrap(peers []Peer) error { } func (n *node) run() { - var propc chan msgWithResult + var propc chan []ProposeData var readyc chan Ready var advancec chan struct{} var rd Ready @@ -393,14 +387,8 @@ func (n *node) run() { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. - case pm := <-propc: - m := pm.m - m.From = r.id - err := r.Step(m) - if pm.result != nil { - pm.result <- err - close(pm.result) - } + case pds := <-propc: + r.Propose(pds...) case m := <-n.recvc: // filter out response message from unknown From. if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { @@ -471,13 +459,33 @@ func (n *node) Tick() { } } -func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } +func (n *node) Campaign(ctx context.Context) error { + return n.step(ctx, pb.Message{Type: pb.MsgHup}) +} -func (n *node) Propose(ctx context.Context, data []byte) error { - ctx, span := n.tracer.Start(ctx, "Propose") - defer span.End() +func (n *node) Propose(ctx context.Context, pds ...ProposeData) { + span := trace.SpanFromContext(ctx) + span.AddEvent("raft.node.Propose() Start") + defer span.AddEvent("raft.node.Propose() End") - return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) + select { + case n.propc <- pds: + case <-ctx.Done(): + err := ctx.Err() + for i := range pds { + pd := &pds[i] + if pd.Callback != nil { + pd.Callback(err) + } + } + case <-n.done: + for i := range pds { + pd := &pds[i] + if pd.Callback != nil { + pd.Callback(ErrStopped) + } + } + } } func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -508,53 +516,17 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error { return n.Step(ctx, msg) } -func (n *node) step(ctx context.Context, m pb.Message) error { - return n.stepWithWaitOption(ctx, m, false) -} - -func (n *node) stepWait(ctx context.Context, m pb.Message) error { - return n.stepWithWaitOption(ctx, m, true) -} - // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. -func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { - if m.Type != pb.MsgProp { - select { - case n.recvc <- m: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-n.done: - return ErrStopped - } - } - ch := n.propc - pm := msgWithResult{m: m} - if wait { - pm.result = make(chan error, 1) - } - select { - case ch <- pm: - if !wait { - return nil - } - case <-ctx.Done(): - return ctx.Err() - case <-n.done: - return ErrStopped - } +func (n *node) step(ctx context.Context, m pb.Message) error { select { - case err := <-pm.result: - if err != nil { - return err - } + case n.recvc <- m: + return nil case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } - return nil } func (n *node) Ready() <-chan Ready { return n.readyc } diff --git a/raft/propose.go b/raft/propose.go new file mode 100644 index 000000000..94680d76f --- /dev/null +++ b/raft/propose.go @@ -0,0 +1,103 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + // standard libraries. + "context" + + // this project. + pb "github.com/linkall-labs/vanus/raft/raftpb" +) + +type ProposeCallback = func(error) + +type ProposeData struct { + Type pb.EntryType + Data []byte + Callback ProposeCallback + NoWaitCommit bool +} + +type ProposeDataOption func(cfg *ProposeData) + +func Data(data []byte) ProposeDataOption { + return func(pd *ProposeData) { + pd.Data = data + } +} + +func Callback(cb ProposeCallback) ProposeDataOption { + return func(pd *ProposeData) { + pd.Callback = cb + } +} + +func NoWaitCommit() ProposeDataOption { + return func(pd *ProposeData) { + pd.NoWaitCommit = true + } +} + +type ProposeOption func(cfg *ProposeData) + +func WithData(opts ...ProposeDataOption) ProposeOption { + return func(cfg *ProposeData) { + for _, opt := range opts { + opt(cfg) + } + } +} + +func Propose(ctx context.Context, n Node, opts ...ProposeOption) { + pds := make([]ProposeData, len(opts)) + for i, opt := range opts { + opt(&pds[i]) + } + n.Propose(ctx, pds...) +} + +type proposeFuture chan error + +func newProposeFuture() proposeFuture { + return make(proposeFuture, 1) +} + +func (pr proposeFuture) onProposed(err error) { + if err != nil { + pr <- err + } + close(pr) +} + +func (pf proposeFuture) wait() error { + return <-pf +} + +func Propose0(ctx context.Context, n Node, data []byte) error { + future := newProposeFuture() + n.Propose(ctx, ProposeData{Data: data, Callback: future.onProposed}) + return future.wait() +} + +func Propose1(ctx context.Context, n Node, data []byte) error { + future := newProposeFuture() + n.Propose(ctx, ProposeData{Data: data, Callback: future.onProposed, NoWaitCommit: true}) + return future.wait() +} + +func Propose2(ctx context.Context, n Node, data []byte) { + n.Propose(ctx, ProposeData{Data: data}) +} diff --git a/raft/raft.go b/raft/raft.go index 980d52278..3033f70d3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -32,8 +32,10 @@ import ( ) // None is a placeholder node ID used when there is no leader. -const None uint64 = 0 -const noLimit = math.MaxUint64 +const ( + None uint64 = 0 + noLimit = math.MaxUint64 +) // Possible values for StateType. const ( @@ -258,6 +260,8 @@ type raft struct { // TODO(tbg): rename to trk. prs tracker.ProgressTracker + inflight inflight + state StateType // isLearner is true if the local raft node is a learner. @@ -305,8 +309,9 @@ type raft struct { randomizedElectionTimeout int disableProposalForwarding bool - tick func() - step stepFunc + tick func() + step stepFunc + propose proposeFunc logger Logger @@ -343,6 +348,7 @@ func newRaft(c *Config) *raft { readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, } + raftlog.inflight = &r.inflight cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, @@ -693,6 +699,7 @@ func (r *raft) tickHeartbeat() { } func (r *raft) becomeFollower(term uint64, lead uint64) { + r.propose = proposeFollower r.step = stepFollower r.reset(term) r.tick = r.tickElection @@ -706,6 +713,7 @@ func (r *raft) becomeCandidate() { if r.state == StateLeader { panic("invalid transition [leader -> candidate]") } + r.propose = proposeCandidate r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection @@ -722,6 +730,7 @@ func (r *raft) becomePreCandidate() { // Becoming a pre-candidate changes our step functions and state, // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. + r.propose = proposeCandidate r.step = stepCandidate r.prs.ResetVotes() r.tick = r.tickElection @@ -735,6 +744,7 @@ func (r *raft) becomeLeader() { if r.state == StateFollower { panic("invalid transition [follower -> leader]") } + r.propose = proposeLeader r.step = stepLeader r.reset(r.Term) r.tick = r.tickHeartbeat @@ -856,6 +866,116 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected return r.prs.TallyVotes() } +func (r *raft) Propose(pds ...ProposeData) { + ents := make([]pb.Entry, len(pds)) + for i := range pds { + pd := &pds[i] + ents[i] = pb.Entry{ + Type: pd.Type, + Data: pd.Data, + } + } + err := r.propose(r, pb.Message{ + Type: pb.MsgProp, + From: r.id, + Entries: ents, + }) + for i := range pds { + pd := &pds[i] + if pd.Callback == nil { + continue + } + if err != nil { + pd.Callback(err) + } else if pd.NoWaitCommit { + pd.Callback(nil) + } else { + r.inflight.append(ents[i].Index, pd.Callback) + } + } +} + +type proposeFunc func(r *raft, m pb.Message) error + +func proposeLeader(r *raft, m pb.Message) error { + if len(m.Entries) == 0 { + r.logger.Panicf("%x stepped empty MsgProp", r.id) + } + if r.prs.Progress[r.id] == nil { + // If we are not currently a member of the range (i.e. this node + // was removed from the configuration while serving as leader), + // drop any new proposals. + return ErrProposalDropped + } + if r.leadTransferee != None { + r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) + return ErrProposalDropped + } + + for i := range m.Entries { + e := &m.Entries[i] + var cc pb.ConfChangeI + if e.Type == pb.EntryConfChange { + var ccc pb.ConfChange + if err := ccc.Unmarshal(e.Data); err != nil { + panic(err) + } + cc = ccc + } else if e.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err := ccc.Unmarshal(e.Data); err != nil { + panic(err) + } + cc = ccc + } + if cc != nil { + alreadyPending := r.pendingConfIndex > r.raftLog.applied + alreadyJoint := len(r.prs.Config.Voters[1]) > 0 + wantsLeaveJoint := len(cc.AsV2().Changes) == 0 + + var refused string + if alreadyPending { + refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) + } else if alreadyJoint && !wantsLeaveJoint { + refused = "must transition out of joint config first" + } else if !alreadyJoint && wantsLeaveJoint { + refused = "not in joint state; refusing empty conf change" + } + + if refused != "" { + r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) + m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } else { + r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 + } + } + } + + if !r.appendEntry(m.Entries...) { + return ErrProposalDropped + } + r.bcastAppend() + return nil +} + +func proposeCandidate(r *raft, m pb.Message) error { + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + return ErrProposalDropped +} + +func proposeFollower(r *raft, m pb.Message) error { + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + return ErrProposalDropped + } else if r.disableProposalForwarding { + r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) + return ErrProposalDropped + } + m.To = r.lead + r.send(m) + return nil +} + func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch { @@ -1030,64 +1150,7 @@ func stepLeader(r *raft, m pb.Message) error { }) return nil case pb.MsgProp: - if len(m.Entries) == 0 { - r.logger.Panicf("%x stepped empty MsgProp", r.id) - } - if r.prs.Progress[r.id] == nil { - // If we are not currently a member of the range (i.e. this node - // was removed from the configuration while serving as leader), - // drop any new proposals. - return ErrProposalDropped - } - if r.leadTransferee != None { - r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) - return ErrProposalDropped - } - - for i := range m.Entries { - e := &m.Entries[i] - var cc pb.ConfChangeI - if e.Type == pb.EntryConfChange { - var ccc pb.ConfChange - if err := ccc.Unmarshal(e.Data); err != nil { - panic(err) - } - cc = ccc - } else if e.Type == pb.EntryConfChangeV2 { - var ccc pb.ConfChangeV2 - if err := ccc.Unmarshal(e.Data); err != nil { - panic(err) - } - cc = ccc - } - if cc != nil { - alreadyPending := r.pendingConfIndex > r.raftLog.applied - alreadyJoint := len(r.prs.Config.Voters[1]) > 0 - wantsLeaveJoint := len(cc.AsV2().Changes) == 0 - - var refused string - if alreadyPending { - refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) - } else if alreadyJoint && !wantsLeaveJoint { - refused = "must transition out of joint config first" - } else if !alreadyJoint && wantsLeaveJoint { - refused = "not in joint state; refusing empty conf change" - } - - if refused != "" { - r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) - m.Entries[i] = pb.Entry{Type: pb.EntryNormal} - } else { - r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 - } - } - } - - if !r.appendEntry(m.Entries...) { - return ErrProposalDropped - } - r.bcastAppend() - return nil + return proposeLeader(r, m) case pb.MsgLogResp: if r.raftLog.stableTo(m.Index, m.LogTerm) { r.prs.Progress[r.id].MaybeUpdate(m.Index) @@ -1410,8 +1473,7 @@ func stepCandidate(r *raft, m pb.Message) error { } switch m.Type { case pb.MsgProp: - r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return ErrProposalDropped + return proposeCandidate(r, m) case pb.MsgApp: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleAppendEntries(m) @@ -1450,15 +1512,7 @@ func stepCandidate(r *raft, m pb.Message) error { func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: - if r.lead == None { - r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return ErrProposalDropped - } else if r.disableProposalForwarding { - r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) - return ErrProposalDropped - } - m.To = r.lead - r.send(m) + return proposeFollower(r, m) case pb.MsgApp: r.electionElapsed = 0 r.lead = m.From