diff --git a/example_test.go b/example_test.go index a693ddea..9e34a3f2 100644 --- a/example_test.go +++ b/example_test.go @@ -18,10 +18,10 @@ import ( pb "go.etcd.io/raft/v3/raftpb" ) -func applyToStore(ents []pb.Entry) {} -func sendMessages(msgs []pb.Message) {} -func saveStateToDisk(st pb.HardState) {} -func saveToDisk(ents []pb.Entry) {} +func applyToStore(_ []pb.Entry) {} +func sendMessages(_ []pb.Message) {} +func saveStateToDisk(_ pb.HardState) {} +func saveToDisk(_ []pb.Entry) {} func ExampleNode() { c := &Config{} diff --git a/node_test.go b/node_test.go index 068002b8..281ada19 100644 --- a/node_test.go +++ b/node_test.go @@ -1090,7 +1090,7 @@ type ignoreSizeHintMemStorage struct { *MemoryStorage } -func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) { +func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, _ uint64) ([]raftpb.Entry, error) { return s.MemoryStorage.Entries(lo, hi, math.MaxUint64) } diff --git a/raftexample/etcdserverpb/raft_internal_stringer.go b/raftexample/etcdserverpb/raft_internal_stringer.go index 31e121ee..3ab1631f 100644 --- a/raftexample/etcdserverpb/raft_internal_stringer.go +++ b/raftexample/etcdserverpb/raft_internal_stringer.go @@ -78,6 +78,7 @@ type txnRequestStringer struct { Request *TxnRequest } +//nolint:revive func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer { return &txnRequestStringer{request} } @@ -167,6 +168,7 @@ type loggablePutRequest struct { IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"` } +//nolint:revive func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest { return &loggablePutRequest{ request.Key, diff --git a/raftexample/fileutil/lock_linux.go b/raftexample/fileutil/lock_linux.go new file mode 100644 index 00000000..c33a2f4a --- /dev/null +++ b/raftexample/fileutil/lock_linux.go @@ -0,0 +1,92 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package fileutil + +import ( + "fmt" + "io" + "os" + "syscall" + + "golang.org/x/sys/unix" +) + +// This used to call syscall.Flock() but that call fails with EBADF on NFS. +// An alternative is lockf() which works on NFS but that call lets a process lock +// the same file twice. Instead, use Linux's non-standard open file descriptor +// locks which will block if the process already holds the file lock. + +var ( + wrlck = syscall.Flock_t{ + Type: syscall.F_WRLCK, + Whence: int16(io.SeekStart), + Start: 0, + Len: 0, + } + + linuxTryLockFile = flockTryLockFile + linuxLockFile = flockLockFile +) + +func init() { + // use open file descriptor locks if the system supports it + getlk := syscall.Flock_t{Type: syscall.F_RDLCK} + if err := syscall.FcntlFlock(0, unix.F_OFD_GETLK, &getlk); err == nil { + linuxTryLockFile = ofdTryLockFile + linuxLockFile = ofdLockFile + } +} + +func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) { + return linuxTryLockFile(path, flag, perm) +} + +func ofdTryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) { + f, err := os.OpenFile(path, flag, perm) + if err != nil { + return nil, fmt.Errorf("ofdTryLockFile failed to open %q (%v)", path, err) + } + + flock := wrlck + if err = syscall.FcntlFlock(f.Fd(), unix.F_OFD_SETLK, &flock); err != nil { + f.Close() + if err == syscall.EWOULDBLOCK { + err = ErrLocked + } + return nil, err + } + return &LockedFile{f}, nil +} + +func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) { + return linuxLockFile(path, flag, perm) +} + +func ofdLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) { + f, err := os.OpenFile(path, flag, perm) + if err != nil { + return nil, fmt.Errorf("ofdLockFile failed to open %q (%v)", path, err) + } + + flock := wrlck + err = syscall.FcntlFlock(f.Fd(), unix.F_OFD_SETLKW, &flock) + if err != nil { + f.Close() + return nil, err + } + return &LockedFile{f}, nil +} diff --git a/raftexample/httpapi.go b/raftexample/httpapi.go index dbe226ad..4d869600 100644 --- a/raftexample/httpapi.go +++ b/raftexample/httpapi.go @@ -23,9 +23,22 @@ import ( "go.etcd.io/raft/v3/raftpb" ) +// KVStore is the interface to the key-value store that is required by +// `httpKVAPI`. +type KVStore interface { + // Propose proposes to set key `k` to value `v` in the key-value + // store. It returns immediately, before the change has + // necessarily taken effect. + Propose(k, v string) + + // Lookup looks up the value for key `k` in the current state of + // the store. + Lookup(k string) (string, bool) +} + // Handler for a http based key-value store backed by raft type httpKVAPI struct { - store *kvstore + store KVStore confChangeC chan<- raftpb.ConfChange } @@ -101,7 +114,9 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // serveHTTPKVAPI starts a key-value server with a GET/PUT API and listens. -func serveHTTPKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { +func serveHTTPKVAPI( + kv KVStore, port int, confChangeC chan<- raftpb.ConfChange, done <-chan struct{}, +) { srv := http.Server{ Addr: ":" + strconv.Itoa(port), Handler: &httpKVAPI{ @@ -116,7 +131,6 @@ func serveHTTPKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, }() // exit when raft goes down - if err, ok := <-errorC; ok { - log.Fatal(err) - } + <-done + _ = srv.Close() } diff --git a/raftexample/kvstore.go b/raftexample/kvstore.go index c30c3387..7aa87899 100644 --- a/raftexample/kvstore.go +++ b/raftexample/kvstore.go @@ -18,20 +18,17 @@ import ( "bytes" "encoding/gob" "encoding/json" + "fmt" "log" "strings" "sync" - - "go.etcd.io/raft/v3/raftexample/snap" - "go.etcd.io/raft/v3/raftpb" ) // a key-value store backed by raft type kvstore struct { - proposeC chan<- string // channel for proposing updates - mu sync.RWMutex - kvStore map[string]string // current committed key-value pairs - snapshotter *snap.Snapshotter + proposeC chan<- string // channel for proposing updates + mu sync.RWMutex + kvStore map[string]string // current committed key-value pairs } type kv struct { @@ -39,21 +36,18 @@ type kv struct { Val string } -func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore { - s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} - snapshot, err := s.loadSnapshot() - if err != nil { - log.Panic(err) +// newKVStore creates and returns a new `kvstore`. The second return +// value can be used as the finite state machine that is driven by a +// `raftNode`. +func newKVStore(proposeC chan<- string) (*kvstore, kvfsm) { + s := &kvstore{ + proposeC: proposeC, + kvStore: make(map[string]string), } - if snapshot != nil { - log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) - if err := s.recoverFromSnapshot(snapshot.Data); err != nil { - log.Panic(err) - } + fsm := kvfsm{ + kvs: s, } - // read commits from raft into kvStore map until error - go s.readCommits(commitC, errorC) - return s + return s, fsm } func (s *kvstore) Lookup(key string) (string, bool) { @@ -71,64 +65,53 @@ func (s *kvstore) Propose(k string, v string) { s.proposeC <- buf.String() } -func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) { - for commit := range commitC { - if commit == nil { - // signaled to load snapshot - snapshot, err := s.loadSnapshot() - if err != nil { - log.Panic(err) - } - if snapshot != nil { - log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) - if err := s.recoverFromSnapshot(snapshot.Data); err != nil { - log.Panic(err) - } - } - continue - } - - for _, data := range commit.data { - var dataKv kv - dec := gob.NewDecoder(bytes.NewBufferString(data)) - if err := dec.Decode(&dataKv); err != nil { - log.Fatalf("raftexample: could not decode message (%v)", err) - } - s.mu.Lock() - s.kvStore[dataKv.Key] = dataKv.Val - s.mu.Unlock() - } - close(commit.applyDoneC) - } - if err, ok := <-errorC; ok { - log.Fatal(err) - } +// Set sets a single value. It should only be called by `kvfsm`. +func (s *kvstore) set(k, v string) { + s.mu.Lock() + defer s.mu.Unlock() + s.kvStore[k] = v } -func (s *kvstore) getSnapshot() ([]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - return json.Marshal(s.kvStore) +func (s *kvstore) restoreFromSnapshot(store map[string]string) { + s.mu.Lock() + defer s.mu.Unlock() + s.kvStore = store } -func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) { - snapshot, err := s.snapshotter.Load() - if err == snap.ErrNoSnapshot { - return nil, nil - } - if err != nil { - return nil, err - } - return snapshot, nil +// kvfsm implements the `FSM` interface for the underlying `*kvstore`. +type kvfsm struct { + kvs *kvstore } -func (s *kvstore) recoverFromSnapshot(snapshot []byte) error { +// RestoreSnapshot restores the current state of the KV store to the +// value encoded in `snapshot`. +func (fsm kvfsm) RestoreSnapshot(snapshot []byte) error { var store map[string]string if err := json.Unmarshal(snapshot, &store); err != nil { return err } - s.mu.Lock() - defer s.mu.Unlock() - s.kvStore = store + fsm.kvs.restoreFromSnapshot(store) + return nil +} + +func (fsm kvfsm) TakeSnapshot() ([]byte, error) { + fsm.kvs.mu.RLock() + defer fsm.kvs.mu.RUnlock() + return json.Marshal(fsm.kvs.kvStore) +} + +// ApplyCommits decodes and applies each of the commits in `commit` to +// the current state, then signals that it is done by closing +// `commit.applyDoneC`. +func (fsm kvfsm) ApplyCommits(commit *commit) error { + for _, data := range commit.data { + var dataKv kv + dec := gob.NewDecoder(bytes.NewBufferString(data)) + if err := dec.Decode(&dataKv); err != nil { + return fmt.Errorf("could not decode message: %w", err) + } + fsm.kvs.set(dataKv.Key, dataKv.Val) + } + close(commit.applyDoneC) return nil } diff --git a/raftexample/kvstore_test.go b/raftexample/kvstore_test.go index 231f778f..8e35875b 100644 --- a/raftexample/kvstore_test.go +++ b/raftexample/kvstore_test.go @@ -22,19 +22,22 @@ import ( func Test_kvstore_snapshot(t *testing.T) { tm := map[string]string{"foo": "bar"} s := &kvstore{kvStore: tm} + fsm := kvfsm{ + kvs: s, + } v, _ := s.Lookup("foo") if v != "bar" { t.Fatalf("foo has unexpected value, got %s", v) } - data, err := s.getSnapshot() + data, err := fsm.TakeSnapshot() if err != nil { t.Fatal(err) } s.kvStore = nil - if err := s.recoverFromSnapshot(data); err != nil { + if err := fsm.RestoreSnapshot(data); err != nil { t.Fatal(err) } v, _ = s.Lookup("foo") diff --git a/raftexample/main.go b/raftexample/main.go index 73f02787..e9db0051 100644 --- a/raftexample/main.go +++ b/raftexample/main.go @@ -16,14 +16,18 @@ package main import ( "flag" + "fmt" + "log" "strings" + "go.uber.org/zap" + "go.etcd.io/raft/v3/raftpb" ) func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") - id := flag.Int("id", 1, "node ID") + id := flag.Uint64("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") flag.Parse() @@ -34,12 +38,28 @@ func main() { defer close(confChangeC) // raft provides a commit stream for the proposals from the http api - var kvs *kvstore - getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } - commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) - kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) + snapshotLogger := zap.NewExample() + snapdir := fmt.Sprintf("raftexample-%d-snap", *id) + snapshotStorage, err := newSnapshotStorage(snapshotLogger, snapdir) + if err != nil { + log.Fatalf("raftexample: %v", err) + } + + kvs, fsm := newKVStore(proposeC) + + rc := startRaftNode( + *id, strings.Split(*cluster, ","), *join, + fsm, snapshotStorage, + proposeC, confChangeC, + ) + + go func() { + if err := rc.ProcessCommits(); err != nil { + log.Fatalf("raftexample: %v", err) + } + }() // the key-value http handler will propose updates to raft - serveHTTPKVAPI(kvs, *kvport, confChangeC, errorC) + serveHTTPKVAPI(kvs, *kvport, confChangeC, rc.Done()) } diff --git a/raftexample/raft.go b/raftexample/raft.go index 08e83844..dfcbd80c 100644 --- a/raftexample/raft.go +++ b/raftexample/raft.go @@ -46,15 +46,21 @@ type commit struct { type raftNode struct { proposeC <-chan string // proposed messages (k,v) confChangeC <-chan raftpb.ConfChange // proposed cluster config changes - commitC chan<- *commit // entries committed to log (k,v) - errorC chan<- error // errors from raft session + commitC chan *commit // entries committed to log (k,v) + errorC chan error // errors from raft session - id int // client ID for raft session - peers []string // raft peer URLs - join bool // node is joining an existing cluster - waldir string // path to WAL directory - snapdir string // path to snapshot directory - getSnapshot func() ([]byte, error) + // When serveChannels is done, `err` is set to any error and then + // `done` is closed. + err error + done chan struct{} + + id uint64 // client ID for raft session + peers []string // raft peer URLs + join bool // node is joining an existing cluster + waldir string // path to WAL directory + fsm FSM + + snapshotStorage SnapshotStorage confState raftpb.ConfState snapshotIndex uint64 @@ -65,9 +71,6 @@ type raftNode struct { raftStorage *raft.MemoryStorage wal *wal.WAL - snapshotter *snap.Snapshotter - snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready - snapCount uint64 transport *rafthttp.Transport stopc chan struct{} // signals proposal channel closed @@ -77,42 +80,139 @@ type raftNode struct { logger *zap.Logger } +// SnapshotStorage is the interface that must be fulfilled by the +// persistent storage for snapshots. +type SnapshotStorage interface { + // SaveSnap saves `snapshot` to persistent storage. + SaveSnap(snapshot raftpb.Snapshot) error + + // Load reads and returns the newest snapshot that is + // available. + Load() (*raftpb.Snapshot, error) + + // LoadNewestAvailable loads the newest available snapshot + // whose term and index matches one of those in walSnaps. + LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) +} + var defaultSnapshotCount uint64 = 10000 -// newRaftNode initiates a raft instance and returns a committed log entry +// FSM is the interface that must be implemented by a finite state +// machine for it to be driven by raft. +type FSM interface { + // TakeSnapshot takes a snapshot of the current state of the + // finite state machine, returning the snapshot as a slice of + // bytes that can be saved or loaded by a `SnapshotStorage`. + TakeSnapshot() ([]byte, error) + + // RestoreSnapshot restores the finite state machine to the state + // represented by `snapshot` (which, in turn, was returned by + // `TakeSnapshot`). + RestoreSnapshot(snapshot []byte) error + + // ApplyCommits applies the changes from `commit` to the finite + // state machine. `commit` is never `nil`. (By contrast, the + // commits that are handled by `ProcessCommits()` can be `nil` to + // signal that a snapshot should be loaded.) + ApplyCommits(commit *commit) error +} + +// startRaftNode initiates a raft instance and returns a committed log entry // channel and error channel. Proposals for log updates are sent over the // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. -func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, - confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) { - +func startRaftNode( + id uint64, peers []string, join bool, + fsm FSM, snapshotStorage SnapshotStorage, + proposeC <-chan string, confChangeC <-chan raftpb.ConfChange, +) *raftNode { commitC := make(chan *commit) errorC := make(chan error) rc := &raftNode{ - proposeC: proposeC, - confChangeC: confChangeC, - commitC: commitC, - errorC: errorC, - id: id, - peers: peers, - join: join, - waldir: fmt.Sprintf("raftexample-%d", id), - snapdir: fmt.Sprintf("raftexample-%d-snap", id), - getSnapshot: getSnapshot, - snapCount: defaultSnapshotCount, - stopc: make(chan struct{}), - httpstopc: make(chan struct{}), - httpdonec: make(chan struct{}), + proposeC: proposeC, + confChangeC: confChangeC, + commitC: commitC, + errorC: errorC, + done: make(chan struct{}), + id: id, + peers: peers, + join: join, + waldir: fmt.Sprintf("raftexample-%d", id), + fsm: fsm, + snapshotStorage: snapshotStorage, + snapCount: defaultSnapshotCount, + stopc: make(chan struct{}), + httpstopc: make(chan struct{}), + httpdonec: make(chan struct{}), logger: zap.NewExample(), - snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } - go rc.startRaft() - return commitC, errorC, rc.snapshotterReady + + rc.loadAndApplySnapshot() + + oldwal := wal.Exist(rc.waldir) + rc.wal = rc.replayWAL() + + go rc.startRaft(oldwal) + + return rc +} + +// loadAndApplySnapshot loads the most recent snapshot from the +// snapshot storage (if any) and applies it to the current state. +func (rc *raftNode) loadAndApplySnapshot() { + snapshot, err := rc.snapshotStorage.Load() + if err != nil { + if err == snap.ErrNoSnapshot { + // No snapshots available; do nothing. + return + } + log.Panic(err) + } + + log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) + if err := rc.fsm.RestoreSnapshot(snapshot.Data); err != nil { + log.Panic(err) + } +} + +// ProcessCommits reads commits from `commitC` and applies them into +// the kvstore until that channel is closed. +func (rc *raftNode) ProcessCommits() error { + for commit := range rc.commitC { + if commit == nil { + // This is a request that we load a snapshot. + rc.loadAndApplySnapshot() + continue + } + + if err := rc.fsm.ApplyCommits(commit); err != nil { + return err + } + } + <-rc.done + return rc.err +} + +// Done returns a channel that is closed when `rc` is done processing +// requests. +func (rc *raftNode) Done() <-chan struct{} { + return rc.done +} + +// Err returns any error encountered while processing requests, or nil +// if request processing is not yet done. +func (rc *raftNode) Err() error { + select { + case <-rc.done: + return rc.err + default: + return nil + } } func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { @@ -124,7 +224,7 @@ func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { // save the snapshot file before writing the snapshot to the wal. // This makes it possible for the snapshot file to become orphaned, but prevents // a WAL snapshot entry from having no corresponding snapshot file. - if err := rc.snapshotter.SaveSnap(snap); err != nil { + if err := rc.snapshotStorage.SaveSnap(snap); err != nil { return err } if err := rc.wal.SaveSnapshot(walSnap); err != nil { @@ -174,7 +274,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}) } case raftpb.ConfChangeRemoveNode: - if cc.NodeID == uint64(rc.id) { + if cc.NodeID == rc.id { log.Println("I've been removed from the cluster! Shutting down.") return nil, false } @@ -206,7 +306,7 @@ func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { if err != nil { log.Fatalf("raftexample: error listing snapshots (%v)", err) } - snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps) + snapshot, err := rc.snapshotStorage.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { log.Fatalf("raftexample: error loading snapshot (%v)", err) } @@ -266,31 +366,20 @@ func (rc *raftNode) replayWAL() *wal.WAL { func (rc *raftNode) writeError(err error) { rc.stopHTTP() close(rc.commitC) + rc.err = err rc.errorC <- err close(rc.errorC) + close(rc.done) rc.node.Stop() } -func (rc *raftNode) startRaft() { - if !fileutil.Exist(rc.snapdir) { - if err := os.Mkdir(rc.snapdir, 0750); err != nil { - log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) - } - } - rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) - - oldwal := wal.Exist(rc.waldir) - rc.wal = rc.replayWAL() - - // signal replay has finished - rc.snapshotterReady <- rc.snapshotter - +func (rc *raftNode) startRaft(oldwal bool) { rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ - ID: uint64(rc.id), + ID: rc.id, ElectionTick: 10, HeartbeatTick: 1, Storage: rc.raftStorage, @@ -311,13 +400,15 @@ func (rc *raftNode) startRaft() { ClusterID: 0x1000, Raft: rc, ServerStats: stats.NewServerStats("", ""), - LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)), - ErrorC: make(chan error), + LeaderStats: stats.NewLeaderStats( + zap.NewExample(), strconv.FormatUint(rc.id, 10), + ), + ErrorC: make(chan error), } rc.transport.Start() for i := range rc.peers { - if i+1 != rc.id { + if uint64(i+1) != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } } @@ -331,6 +422,7 @@ func (rc *raftNode) stop() { rc.stopHTTP() close(rc.commitC) close(rc.errorC) + close(rc.done) rc.node.Stop() } @@ -375,7 +467,7 @@ func (rc *raftNode) maybeTriggerSnapshot(applyDoneC <-chan struct{}) { } log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex) - data, err := rc.getSnapshot() + data, err := rc.fsm.TakeSnapshot() if err != nil { log.Panic(err) } @@ -515,8 +607,17 @@ func (rc *raftNode) serveRaft() { func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { return rc.node.Step(ctx, m) } -func (rc *raftNode) IsIDRemoved(id uint64) bool { return false } +func (rc *raftNode) IsIDRemoved(_ uint64) bool { return false } func (rc *raftNode) ReportUnreachable(id uint64) { rc.node.ReportUnreachable(id) } func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) { rc.node.ReportSnapshot(id, status) } + +func newSnapshotStorage(lg *zap.Logger, dir string) (SnapshotStorage, error) { + if !fileutil.Exist(dir) { + if err := os.Mkdir(dir, 0750); err != nil { + return nil, fmt.Errorf("cannot create dir for snapshot: %w", err) + } + } + return snap.New(lg, dir), nil +} diff --git a/raftexample/raftexample_test.go b/raftexample/raftexample_test.go index 6a8e5274..40d74ec4 100644 --- a/raftexample/raftexample_test.go +++ b/raftexample/raftexample_test.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "io" + "log" "net/http" "net/http/httptest" "os" @@ -25,72 +26,102 @@ import ( "testing" "time" + "go.uber.org/zap" + "go.etcd.io/raft/v3/raftpb" ) -func getSnapshotFn() (func() ([]byte, error), <-chan struct{}) { - snapshotTriggeredC := make(chan struct{}) - return func() ([]byte, error) { - snapshotTriggeredC <- struct{}{} - return nil, nil - }, snapshotTriggeredC +type peer struct { + node *raftNode + proposeC chan string + confChangeC chan raftpb.ConfChange + fsm FSM +} + +type nullFSM struct{} + +func (nullFSM) TakeSnapshot() ([]byte, error) { + return nil, nil +} + +func (nullFSM) RestoreSnapshot(_ []byte) error { + return nil +} + +func (nullFSM) ApplyCommits(_ *commit) error { + return nil } type cluster struct { - peers []string - commitC []<-chan *commit - errorC []<-chan error - proposeC []chan string - confChangeC []chan raftpb.ConfChange - snapshotTriggeredC []<-chan struct{} + peerNames []string + peers []*peer } // newCluster creates a cluster of n nodes -func newCluster(n int) *cluster { - peers := make([]string, n) - for i := range peers { - peers[i] = fmt.Sprintf("http://127.0.0.1:%d", 10000+i) +func newCluster(fsms ...FSM) *cluster { + clus := cluster{ + peerNames: make([]string, 0, len(fsms)), + peers: make([]*peer, 0, len(fsms)), } + for i := range fsms { + clus.peerNames = append(clus.peerNames, fmt.Sprintf("http://127.0.0.1:%d", 10000+i)) + } + + for i, fsm := range fsms { + id := uint64(i + 1) + peer := peer{ + proposeC: make(chan string, 1), + confChangeC: make(chan raftpb.ConfChange, 1), + fsm: fsm, + } + + snapdir := fmt.Sprintf("raftexample-%d-snap", id) + os.RemoveAll(fmt.Sprintf("raftexample-%d", id)) + os.RemoveAll(snapdir) - clus := &cluster{ - peers: peers, - commitC: make([]<-chan *commit, len(peers)), - errorC: make([]<-chan error, len(peers)), - proposeC: make([]chan string, len(peers)), - confChangeC: make([]chan raftpb.ConfChange, len(peers)), - snapshotTriggeredC: make([]<-chan struct{}, len(peers)), + snapshotLogger := zap.NewExample() + snapshotStorage, err := newSnapshotStorage(snapshotLogger, snapdir) + if err != nil { + log.Fatalf("raftexample: %v", err) + } + + peer.node = startRaftNode( + id, clus.peerNames, false, + peer.fsm, snapshotStorage, + peer.proposeC, peer.confChangeC, + ) + clus.peers = append(clus.peers, &peer) } + return &clus +} + +// Cleanup cleans up temporary files used by the test cluster. +func (clus *cluster) Cleanup() { for i := range clus.peers { os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1)) - clus.proposeC[i] = make(chan string, 1) - clus.confChangeC[i] = make(chan raftpb.ConfChange, 1) - fn, snapshotTriggeredC := getSnapshotFn() - clus.snapshotTriggeredC[i] = snapshotTriggeredC - clus.commitC[i], clus.errorC[i], _ = newRaftNode(i+1, clus.peers, false, fn, clus.proposeC[i], clus.confChangeC[i]) } - - return clus } // Close closes all cluster nodes and returns an error if any failed. func (clus *cluster) Close() (err error) { - for i := range clus.peers { - go func(i int) { - for range clus.commitC[i] { + for _, peer := range clus.peers { + peer := peer + go func() { + //nolint:revive + for range peer.node.commitC { // drain pending commits } - }(i) - close(clus.proposeC[i]) + }() + close(peer.proposeC) // wait for channel to close - if erri := <-clus.errorC[i]; erri != nil { + <-peer.node.Done() + if erri := peer.node.Err(); erri != nil { err = erri } - // clean intermediates - os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) - os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1)) } + clus.Cleanup() return err } @@ -102,47 +133,94 @@ func (clus *cluster) closeNoErrors(t *testing.T) { t.Log("closing cluster [done]") } +type feedbackFSM struct { + nullFSM + peer *peer + id int + reEcho int + expected int + received int +} + +func newFeedbackFSM(id int, reEcho int, expected int) *feedbackFSM { + return &feedbackFSM{ + id: id, + reEcho: reEcho, + expected: expected, + } +} + +func (fsm *feedbackFSM) ApplyCommits(commit *commit) error { + for _, msg := range commit.data { + var originator, source, index int + if n, err := fmt.Sscanf(msg, "foo%d-%d-%d", &originator, &source, &index); err != nil || n != 3 { + panic(err) + } + if fsm.reEcho > 0 { + fsm.peer.proposeC <- fmt.Sprintf("foo%d-%d-%d", originator, fsm.id, index+1) + fsm.reEcho-- + } + + fsm.received++ + if fsm.received == fsm.expected { + close(fsm.peer.proposeC) + } + } + + close(commit.applyDoneC) + + return nil +} + // TestProposeOnCommit starts three nodes and feeds commits back into the proposal // channel. The intent is to ensure blocking on a proposal won't block raft progress. func TestProposeOnCommit(t *testing.T) { - clus := newCluster(3) - defer clus.closeNoErrors(t) + // We generate one proposal for each node to kick things off, then + // each node "echos" back the first 99 commits that it receives. + // So the total number of commits that each node expects to see is + // 300. + fsms := []*feedbackFSM{ + newFeedbackFSM(1, 99, 300), + newFeedbackFSM(2, 99, 300), + newFeedbackFSM(3, 99, 300), + } + clus := newCluster(fsms[0], fsms[1], fsms[2]) + for i := range fsms { + fsms[i].peer = clus.peers[i] + } + defer clus.Cleanup() - donec := make(chan struct{}) - for i := range clus.peers { - // feedback for "n" committed entries, then update donec - go func(pC chan<- string, cC <-chan *commit, eC <-chan error) { - for n := 0; n < 100; n++ { - c, ok := <-cC - if !ok { - pC = nil - } - select { - case pC <- c.data[0]: - continue - case err := <-eC: - t.Errorf("eC message (%v)", err) - } - } - donec <- struct{}{} - for range cC { - // acknowledge the commits from other nodes so - // raft continues to make progress + var wg sync.WaitGroup + for _, fsm := range fsms { + fsm := fsm + wg.Add(1) + go func() { + defer wg.Done() + if err := fsm.peer.node.ProcessCommits(); err != nil { + t.Error("ProcessCommits returned error", err) } - }(clus.proposeC[i], clus.commitC[i], clus.errorC[i]) - - // one message feedback per node - go func(i int) { clus.proposeC[i] <- "foo" }(i) + }() + + // Trigger the whole cascade by sending one message per node: + wg.Add(1) + go func() { + defer wg.Done() + fsm.peer.proposeC <- fmt.Sprintf("foo%d-%d-%d", fsm.id, fsm.id, 0) + }() } - for range clus.peers { - <-donec + wg.Wait() + + for _, fsm := range fsms { + if fsm.received != fsm.expected { + t.Errorf("node %d received %d commits (expected %d)", fsm.id, fsm.received, fsm.expected) + } } } // TestCloseProposerBeforeReplay tests closing the producer before raft starts. func TestCloseProposerBeforeReplay(t *testing.T) { - clus := newCluster(1) + clus := newCluster(nullFSM{}) // close before replay so raft never starts defer clus.closeNoErrors(t) } @@ -150,7 +228,7 @@ func TestCloseProposerBeforeReplay(t *testing.T) { // TestCloseProposerInflight tests closing the producer while // committed messages are being published to the client. func TestCloseProposerInflight(t *testing.T) { - clus := newCluster(1) + clus := newCluster(nullFSM{}) defer clus.closeNoErrors(t) var wg sync.WaitGroup @@ -159,12 +237,12 @@ func TestCloseProposerInflight(t *testing.T) { // some inflight ops go func() { defer wg.Done() - clus.proposeC[0] <- "foo" - clus.proposeC[0] <- "bar" + clus.peers[0].proposeC <- "foo" + clus.peers[0].proposeC <- "bar" }() // wait for one message - if c, ok := <-clus.commitC[0]; !ok || c.data[0] != "foo" { + if c, ok := <-clus.peers[0].node.commitC; !ok || c.data[0] != "foo" { t.Fatalf("Commit failed") } @@ -180,11 +258,27 @@ func TestPutAndGetKeyValue(t *testing.T) { confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) - var kvs *kvstore - getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } - commitC, errorC, snapshotterReady := newRaftNode(1, clusters, false, getSnapshot, proposeC, confChangeC) + id := uint64(1) + snapshotLogger := zap.NewExample() + snapdir := fmt.Sprintf("raftexample-%d-snap", id) + snapshotStorage, err := newSnapshotStorage(snapshotLogger, snapdir) + if err != nil { + log.Fatalf("raftexample: %v", err) + } + + kvs, fsm := newKVStore(proposeC) - kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) + node := startRaftNode( + id, clusters, false, + fsm, snapshotStorage, + proposeC, confChangeC, + ) + + go func() { + if err := node.ProcessCommits(); err != nil { + log.Fatalf("raftexample: %v", err) + } + }() srv := httptest.NewServer(&httpKVAPI{ store: kvs, @@ -231,20 +325,22 @@ func TestPutAndGetKeyValue(t *testing.T) { // TestAddNewNode tests adding new node to the existing cluster. func TestAddNewNode(t *testing.T) { - clus := newCluster(3) + clus := newCluster(nullFSM{}, nullFSM{}, nullFSM{}) defer clus.closeNoErrors(t) + id := uint64(4) + snapdir := fmt.Sprintf("raftexample-%d-snap", id) os.RemoveAll("raftexample-4") - os.RemoveAll("raftexample-4-snap") + os.RemoveAll(snapdir) defer func() { os.RemoveAll("raftexample-4") - os.RemoveAll("raftexample-4-snap") + os.RemoveAll(snapdir) }() newNodeURL := "http://127.0.0.1:10004" - clus.confChangeC[0] <- raftpb.ConfChange{ + clus.peers[0].confChangeC <- raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, - NodeID: 4, + NodeID: id, Context: []byte(newNodeURL), } @@ -254,17 +350,37 @@ func TestAddNewNode(t *testing.T) { confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) - newRaftNode(4, append(clus.peers, newNodeURL), true, nil, proposeC, confChangeC) + snapshotLogger := zap.NewExample() + snapshotStorage, err := newSnapshotStorage(snapshotLogger, snapdir) + if err != nil { + log.Fatalf("raftexample: %v", err) + } + + startRaftNode( + id, append(clus.peerNames, newNodeURL), true, + nullFSM{}, snapshotStorage, + proposeC, confChangeC, + ) go func() { proposeC <- "foo" }() - if c, ok := <-clus.commitC[0]; !ok || c.data[0] != "foo" { + if c, ok := <-clus.peers[0].node.commitC; !ok || c.data[0] != "foo" { t.Fatalf("Commit failed") } } +type snapshotWatcher struct { + nullFSM + C chan struct{} +} + +func (sw snapshotWatcher) TakeSnapshot() ([]byte, error) { + sw.C <- struct{}{} + return nil, nil +} + func TestSnapshot(t *testing.T) { prevDefaultSnapshotCount := defaultSnapshotCount prevSnapshotCatchUpEntriesN := snapshotCatchUpEntriesN @@ -275,20 +391,22 @@ func TestSnapshot(t *testing.T) { snapshotCatchUpEntriesN = prevSnapshotCatchUpEntriesN }() - clus := newCluster(3) + sw := snapshotWatcher{C: make(chan struct{})} + + clus := newCluster(sw, nullFSM{}, nullFSM{}) defer clus.closeNoErrors(t) go func() { - clus.proposeC[0] <- "foo" + clus.peers[0].proposeC <- "foo" }() - c := <-clus.commitC[0] + c := <-clus.peers[0].node.commitC select { - case <-clus.snapshotTriggeredC[0]: + case <-sw.C: t.Fatalf("snapshot triggered before applying done") default: } close(c.applyDoneC) - <-clus.snapshotTriggeredC[0] + <-sw.C } diff --git a/raftexample/transport/sockopt_unix.go b/raftexample/transport/sockopt_unix.go index 4e76bf95..7ec936b7 100644 --- a/raftexample/transport/sockopt_unix.go +++ b/raftexample/transport/sockopt_unix.go @@ -22,13 +22,13 @@ import ( "golang.org/x/sys/unix" ) -func setReusePort(network, address string, conn syscall.RawConn) error { +func setReusePort(_, _ string, conn syscall.RawConn) error { return conn.Control(func(fd uintptr) { syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1) }) } -func setReuseAddress(network, address string, conn syscall.RawConn) error { +func setReuseAddress(_, _ string, conn syscall.RawConn) error { return conn.Control(func(fd uintptr) { syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1) }) diff --git a/raftexample/verify/verify.go b/raftexample/verify/verify.go index 0cc1b482..85aff5b0 100644 --- a/raftexample/verify/verify.go +++ b/raftexample/verify/verify.go @@ -20,12 +20,15 @@ import ( "strings" ) +//nolint:revive const ENV_VERIFY = "ETCD_VERIFY" type VerificationType string const ( - ENV_VERIFY_VALUE_ALL VerificationType = "all" + //nolint:revive + ENV_VERIFY_VALUE_ALL VerificationType = "all" + //nolint:revive ENV_VERIFY_VALUE_ASSERT VerificationType = "assert" ) diff --git a/raftexample/wal/version.go b/raftexample/wal/version.go index d4caae2a..bcf86e02 100644 --- a/raftexample/wal/version.go +++ b/raftexample/wal/version.go @@ -32,6 +32,8 @@ import ( // ReadWALVersion reads remaining entries from opened WAL and returns struct // that implements schema.WAL interface. +// +//nolint:revive func ReadWALVersion(w *WAL) (*walVersion, error) { _, _, ents, err := w.ReadAll() if err != nil { diff --git a/rafttest/interaction_env_handler_campaign.go b/rafttest/interaction_env_handler_campaign.go index bde5cc42..7cf970a0 100644 --- a/rafttest/interaction_env_handler_campaign.go +++ b/rafttest/interaction_env_handler_campaign.go @@ -26,6 +26,6 @@ func (env *InteractionEnv) handleCampaign(t *testing.T, d datadriven.TestData) e } // Campaign the node at the given index. -func (env *InteractionEnv) Campaign(t *testing.T, idx int) error { +func (env *InteractionEnv) Campaign(_ *testing.T, idx int) error { return env.Nodes[idx].Campaign() } diff --git a/rafttest/interaction_env_handler_log_level.go b/rafttest/interaction_env_handler_log_level.go index 2194c9ee..1778a660 100644 --- a/rafttest/interaction_env_handler_log_level.go +++ b/rafttest/interaction_env_handler_log_level.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/datadriven" ) -func (env *InteractionEnv) handleLogLevel(t *testing.T, d datadriven.TestData) error { +func (env *InteractionEnv) handleLogLevel(_ *testing.T, d datadriven.TestData) error { return env.LogLevel(d.CmdArgs[0].Key) } diff --git a/rawnode_test.go b/rawnode_test.go index 553d74fd..0d775868 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -37,7 +37,7 @@ type rawNodeAdapter struct { var _ Node = (*rawNodeAdapter)(nil) // TransferLeadership is to test when node specifies lead, which is pointless, can just be filled in. -func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) { +func (a *rawNodeAdapter) TransferLeadership(_ context.Context, _, transferee uint64) { a.RawNode.TransferLeader(transferee) }