Skip to content

Commit

Permalink
fix(store): break wal format after recover (#389)
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>

Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor authored Jan 5, 2023
1 parent 394c0e9 commit 303e88a
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
3 changes: 1 addition & 2 deletions internal/store/block/raft/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
span.AddEvent("store.block.raft.appender.applyEntries() Start")
defer span.AddEvent("store.block.raft.appender.applyEntries() End")

var cs *raftpb.ConfState
for i := 0; i < len(committedEntries); i++ {
pbEntry := &committedEntries[i]
index := pbEntry.Index
Expand All @@ -299,7 +298,7 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
}

// Change membership.
cs = a.applyConfChange(ctx, pbEntry)
cs := a.applyConfChange(ctx, pbEntry)
ch := make(chan struct{})
go func() {
if err := a.log.SetConfState(ctx, *cs); err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/store/io/stream/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (s *scheduler) Register(z zone.Interface, wo int64) Stream {
if err := buf.RecoverFromFile(f, off, int(so)); err != nil {
panic(err)
}
// FIXME(james.yin): switch buf if it is full.
}

ss := &stream{
Expand Down
3 changes: 2 additions & 1 deletion internal/store/io/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ func (s *stream) Append(r stdio.Reader, cb io.WriteCallback) {

if empty {
s.waiting = append(s.waiting, cb)
if last == nil && !s.dirty {
if last == nil {
s.dirty = true
s.startFlushTimer()
return
}
}

Expand Down
6 changes: 6 additions & 0 deletions internal/store/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/linkall-labs/vanus/internal/store/io/engine"
"github.com/linkall-labs/vanus/internal/store/io/stream"
"github.com/linkall-labs/vanus/internal/store/io/zone/segmentedfile"
"github.com/linkall-labs/vanus/internal/store/wal/record"
)

const (
Expand Down Expand Up @@ -132,6 +133,11 @@ func open(ctx context.Context, dir string, cfg config) (*WAL, error) {
return nil, err
}

// Skip padding.
if padding := int64(cfg.blockSize) - off%int64(cfg.blockSize); padding < record.HeaderSize {
off += padding
}

scheduler := stream.NewScheduler(cfg.engine, cfg.blockSize, cfg.flushTimeout)
s := scheduler.Register(sf, off)

Expand Down

0 comments on commit 303e88a

Please sign in to comment.