Skip to content

Commit

Permalink
feat(store): raft support commit callback
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor committed Dec 28, 2022
1 parent b3f905f commit 42fdb85
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 294 deletions.
4 changes: 3 additions & 1 deletion internal/store/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ type Reader interface {
Read(ctx context.Context, seq int64, num int) ([]Entry, error)
}

type AppendCallback = func(seqs []int64, err error)

type Appender interface {
Append(ctx context.Context, entries ...Entry) ([]int64, error)
Append(ctx context.Context, entries []Entry, cb AppendCallback)
}

type Block interface {
Expand Down
184 changes: 36 additions & 148 deletions internal/store/block/raft/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

// third-party libraries.
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

// first-party libraries.
Expand Down Expand Up @@ -67,11 +66,6 @@ type peer struct {

type LeaderChangedListener func(block, leader vanus.ID, term uint64)

type commitWaiter struct {
offset int64
c chan struct{}
}

type Appender interface {
block.Appender

Expand All @@ -86,11 +80,6 @@ type appender struct {
actx block.AppendContext
appendMu sync.RWMutex

waiters []commitWaiter
commitIndex uint64
commitOffset int64
waitMu sync.Mutex

leaderID vanus.ID
listener LeaderChangedListener

Expand All @@ -116,7 +105,6 @@ func NewAppender(

a := &appender{
raw: raw,
waiters: make([]commitWaiter, 0),
listener: listener,
log: raftLog,
host: host,
Expand All @@ -126,7 +114,6 @@ func NewAppender(
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
}
a.actx = a.raw.NewAppendContext(nil)
a.commitOffset = a.actx.WriteOffset()

a.log.SetSnapshotOperator(a)
a.host.Register(a.ID().Uint64(), a)
Expand All @@ -145,9 +132,6 @@ func NewAppender(
}
a.node = raft.RestartNode(c)

// Access Commit after raft.RestartNode to ensure raft state is initialized.
a.commitIndex = a.log.HardState().Commit

go a.run(ctx)

return a
Expand Down Expand Up @@ -207,13 +191,6 @@ func (a *appender) run(ctx context.Context) {
case rd := <-a.node.Ready():
rCtx, span := a.tracer.Start(ctx, "RaftReady", trace.WithNewRoot())

var partial bool
stateChanged := !raft.IsEmptyHardState(rd.HardState)
if stateChanged {
// Wake up fast before writing logs.
partial = a.wakeup(rCtx, rd.HardState.Commit)
}

if len(rd.Entries) != 0 {
log.Debug(rCtx, "Append entries to raft log.", map[string]interface{}{
"node_id": a.ID(),
Expand All @@ -238,11 +215,7 @@ func (a *appender) run(ctx context.Context) {
})
}

if stateChanged {
// Wake up after writing logs.
if partial {
_ = a.wakeup(rCtx, rd.HardState.Commit)
}
if !raft.IsEmptyHardState(rd.HardState) {
log.Debug(rCtx, "Persist raft hard state.", map[string]interface{}{
"node_id": a.ID(),
"hard_state": rd.HardState,
Expand Down Expand Up @@ -336,40 +309,6 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
return committedEntries[num-1].Index
}

// wakeup wakes up append requests to the smaller of the committed or last index.
func (a *appender) wakeup(ctx context.Context, commit uint64) (partial bool) {
_, span := a.tracer.Start(ctx, "wakeup", trace.WithAttributes(
attribute.Int64("commit", int64(commit))))
defer span.End()

li, _ := a.log.LastIndex()
if commit > li {
commit = li
partial = true
}

if commit <= a.commitIndex {
return
}
a.commitIndex = commit

for off := commit; off > 0; off-- {
pbEntries, err := a.log.Entries(off, off+1, 0)
if err != nil {
return
}

pbEntry := pbEntries[0]
if pbEntry.Type == raftpb.EntryNormal && len(pbEntry.Data) > 0 {
frag := block.NewFragment(pbEntry.Data)
a.doWakeup(ctx, frag.EndOffset())
return
}
}

return partial
}

func (a *appender) becomeLeader(ctx context.Context) {
ctx, span := a.tracer.Start(ctx, "becomeLeader")
defer span.End()
Expand Down Expand Up @@ -459,65 +398,65 @@ func (a *appender) reset(ctx context.Context) {
}

// Append implements block.raw.
func (a *appender) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) {
func (a *appender) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) {
ctx, span := a.tracer.Start(ctx, "Append")
defer span.End()

seqs, offset, err := a.append(ctx, entries)
if err != nil {
if errors.Is(err, errors.ErrSegmentFull) {
_ = a.waitCommit(ctx, offset)
}
return nil, err
}

// Wait until entries is committed.
err = a.waitCommit(ctx, offset)
if err != nil {
return nil, err
}

return seqs, nil
}

func (a *appender) append(ctx context.Context, entries []block.Entry) ([]int64, int64, error) {
ctx, span := a.tracer.Start(ctx, "append")
defer span.End()

span.AddEvent("Acquiring append lock")
a.appendMu.Lock()
span.AddEvent("Got append lock")

defer a.appendMu.Unlock()

if !a.isLeader() {
return nil, 0, errors.ErrNotLeader
a.appendMu.Unlock()
cb(nil, errors.ErrNotLeader)
return
}

if a.actx.Archived() {
return nil, a.actx.WriteOffset(), errors.ErrSegmentFull
a.appendMu.Unlock()
cb(nil, errors.ErrSegmentFull)
return
}

seqs, frag, enough, err := a.raw.PrepareAppend(ctx, a.actx, entries...)
if err != nil {
return nil, 0, err
a.appendMu.Unlock()
cb(nil, err)
return
}
off := a.actx.WriteOffset()

data, _ := block.MarshalFragment(ctx, frag)
if err = a.node.Propose(ctx, data); err != nil {
return nil, 0, err
}

var pds []raft.ProposeData
if enough {
if frag, err = a.raw.PrepareArchive(ctx, a.actx); err == nil {
data, _ := block.MarshalFragment(ctx, frag)
_ = a.node.Propose(ctx, data)
if frag, err := a.raw.PrepareArchive(ctx, a.actx); err == nil {
archivedData, _ := block.MarshalFragment(ctx, frag)
pds = make([]raft.ProposeData, 2)
// FIXME(james.yin): revert archived if propose failed.
pds[1] = raft.ProposeData{
Data: archivedData,
}
} else {
pds = make([]raft.ProposeData, 1)
}
} else {
pds = make([]raft.ProposeData, 1)
}

return seqs, off, nil
pds[0] = raft.ProposeData{
Data: data,
Callback: func(err error) {
if err != nil {
cb(nil, err)
} else {
cb(seqs, nil)
}
},
}

a.node.Propose(ctx, pds...)

a.appendMu.Unlock()
}

func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
Expand All @@ -527,57 +466,6 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
_, _ = a.raw.CommitAppend(ctx, frags...)
}

func (a *appender) waitCommit(ctx context.Context, offset int64) error {
ctx, span := a.tracer.Start(ctx, "waitCommit")
defer span.End()

span.AddEvent("Acquiring wait lock")
a.waitMu.Lock()
span.AddEvent("Got wait lock")

if offset <= a.commitOffset {
a.waitMu.Unlock()
return nil
}

ch := make(chan struct{})
a.waiters = append(a.waiters, commitWaiter{
offset: offset,
c: ch,
})

a.waitMu.Unlock()

// FIXME(james.yin): lost leader
select {
case <-ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (a *appender) doWakeup(ctx context.Context, commit int64) {
_, span := a.tracer.Start(ctx, "doWakeup")
defer span.End()

span.AddEvent("Acquiring wait lock")
a.waitMu.Lock()
span.AddEvent("Got wait lock")

defer a.waitMu.Unlock()

for len(a.waiters) != 0 {
waiter := a.waiters[0]
if waiter.offset > commit {
break
}
close(waiter.c)
a.waiters = a.waiters[1:]
}
a.commitOffset = commit
}

func (a *appender) Status() ClusterStatus {
leader, term := a.leaderInfo()
return ClusterStatus{
Expand Down
4 changes: 2 additions & 2 deletions internal/store/segment/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (r *replica) Read(ctx context.Context, seq int64, num int) ([]block.Entry,
return r.raw.Read(ctx, seq, num)
}

func (r *replica) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) {
return r.appender.Append(ctx, entries...)
func (r *replica) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) {
r.appender.Append(ctx, entries, cb)
}

func (r *replica) Status() *metapb.SegmentHealthInfo {
Expand Down
27 changes: 26 additions & 1 deletion internal/store/segment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ type leaderInfo struct {
term uint64
}

type appendResult struct {
seqs []int64
err error
}

type appendFuture chan appendResult

func newAppendFuture() appendFuture {
return make(appendFuture, 1)
}

func (af appendFuture) onAppended(seqs []int64, err error) {
af <- appendResult{
seqs: seqs,
err: err,
}
}

func (af appendFuture) wait() ([]int64, error) {
res := <-af
return res.seqs, res.err
}

type server struct {
replicas sync.Map // vanus.ID, Replica

Expand Down Expand Up @@ -667,7 +690,9 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(len(events)))
metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size))

seqs, err := b.Append(ctx, entries...)
future := newAppendFuture()
b.Append(ctx, entries, future.onAppended)
seqs, err := future.wait()
if err != nil {
return nil, s.processAppendError(ctx, b, err)
}
Expand Down
Loading

0 comments on commit 42fdb85

Please sign in to comment.