Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor raftexample using interfaces to better separate concerns #1

Open
wants to merge 32 commits into
base: raft-example
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2de1d3c
Add a Linux implementation of file locking
mhagger Jul 19, 2023
adc2baf
Fix/suppress some linter warnings
mhagger Jul 18, 2023
a106937
Define `SnapshotStorage` interface
mhagger Mar 8, 2023
dab4875
Define `KVStore` interface
mhagger Mar 8, 2023
5e02aa3
raftNode: initialize `snapshotStorage` in `newRaftNode()`
mhagger Mar 8, 2023
ce5fd50
newRaftNode(): inline part of the goroutine's work
mhagger Mar 8, 2023
0513333
startRaftNode(): replacement for `newRaftNode()`
mhagger Mar 8, 2023
4f23ef4
raftNode.snapdir: remove member
mhagger Mar 8, 2023
e13d27d
raftNode.id: convert type to `uint64`
mhagger Mar 8, 2023
6c51b15
startRaftNode(): take the `SnapshotStorage` as an argument
mhagger Mar 8, 2023
81cfae6
kvstore.loadAndApplySnapshot(), applyCommits(): extract methods
mhagger Mar 8, 2023
67e3892
kvstore: separate initialization from startup
mhagger Mar 8, 2023
b928307
kvstore.loadSnapshot(): inline method
mhagger Mar 8, 2023
a4bad71
FSM: new interface, representing a finite state machine
mhagger Mar 8, 2023
3062558
Move more functionality from `kvstore` to `kvfsm`
mhagger Mar 8, 2023
093d670
TestProposeOnCommit(): add some clarifying comments
mhagger Mar 8, 2023
317f7c1
raftexample_test: introduce `peer` type
mhagger Mar 8, 2023
5e6216d
raftexample_test: give each `peer` its own `FSM`
mhagger Mar 9, 2023
88c1adf
kvfsm.applyCommits(): return an error
mhagger Mar 9, 2023
d368f8f
FSM.ProcessCommits(): return an error rather than calling `log.Fatal()`
mhagger Mar 9, 2023
4c4fd23
FSM.ApplyCommits(): new method
mhagger Mar 9, 2023
51e4dc4
newKVStore(): don't call `LoadAndApplySnapshot()`
mhagger Mar 9, 2023
ec0d6e2
Make `ProcessCommits()` a method of `raftNode`
mhagger Mar 9, 2023
1a3e303
newRaftNode(): call `LoadAndApplySnapshot()` here
mhagger Mar 9, 2023
5a1eaee
LoadAndApplySnapshot(): move method to `raftNode` and make it private
mhagger Mar 9, 2023
2b78781
raftNode: add a new and better way to tell when the node is done
mhagger Mar 9, 2023
3e4b1c4
serveHTTPKVAPI(): monitor the raft node using its "done" channel
mhagger Mar 9, 2023
4f0c2ef
cluster.Close(): read any error from the node directly
mhagger Mar 9, 2023
0504378
TestProposeOnCommit(): read any error from the node directly
mhagger Mar 9, 2023
75331cd
cluster.Cleanup(): new method, extracted from `Close()`
mhagger Mar 11, 2023
41c3f43
TestProposeOnCommit(): change test to use `ProcessCommits()`
mhagger Mar 11, 2023
b4562f9
newRaftNode(): don't return `commitC` and `errorC`
mhagger Mar 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
pb "go.etcd.io/raft/v3/raftpb"
)

func applyToStore(ents []pb.Entry) {}
func sendMessages(msgs []pb.Message) {}
func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry) {}
func applyToStore(_ []pb.Entry) {}
func sendMessages(_ []pb.Message) {}
func saveStateToDisk(_ pb.HardState) {}
func saveToDisk(_ []pb.Entry) {}

func ExampleNode() {
c := &Config{}
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ type ignoreSizeHintMemStorage struct {
*MemoryStorage
}

func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, _ uint64) ([]raftpb.Entry, error) {
return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
}

Expand Down
2 changes: 2 additions & 0 deletions raftexample/etcdserverpb/raft_internal_stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type txnRequestStringer struct {
Request *TxnRequest
}

//nolint:revive
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
return &txnRequestStringer{request}
}
Expand Down Expand Up @@ -167,6 +168,7 @@ type loggablePutRequest struct {
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}

//nolint:revive
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
Expand Down
92 changes: 92 additions & 0 deletions raftexample/fileutil/lock_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux

package fileutil

import (
"fmt"
"io"
"os"
"syscall"

"golang.org/x/sys/unix"
)

// This used to call syscall.Flock() but that call fails with EBADF on NFS.
// An alternative is lockf() which works on NFS but that call lets a process lock
// the same file twice. Instead, use Linux's non-standard open file descriptor
// locks which will block if the process already holds the file lock.

var (
wrlck = syscall.Flock_t{
Type: syscall.F_WRLCK,
Whence: int16(io.SeekStart),
Start: 0,
Len: 0,
}

linuxTryLockFile = flockTryLockFile
linuxLockFile = flockLockFile
)

func init() {
// use open file descriptor locks if the system supports it
getlk := syscall.Flock_t{Type: syscall.F_RDLCK}
if err := syscall.FcntlFlock(0, unix.F_OFD_GETLK, &getlk); err == nil {
linuxTryLockFile = ofdTryLockFile
linuxLockFile = ofdLockFile
}
}

func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
return linuxTryLockFile(path, flag, perm)
}

func ofdTryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
f, err := os.OpenFile(path, flag, perm)
if err != nil {
return nil, fmt.Errorf("ofdTryLockFile failed to open %q (%v)", path, err)
}

flock := wrlck
if err = syscall.FcntlFlock(f.Fd(), unix.F_OFD_SETLK, &flock); err != nil {
f.Close()
if err == syscall.EWOULDBLOCK {
err = ErrLocked
}
return nil, err
}
return &LockedFile{f}, nil
}

func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
return linuxLockFile(path, flag, perm)
}

func ofdLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
f, err := os.OpenFile(path, flag, perm)
if err != nil {
return nil, fmt.Errorf("ofdLockFile failed to open %q (%v)", path, err)
}

flock := wrlck
err = syscall.FcntlFlock(f.Fd(), unix.F_OFD_SETLKW, &flock)
if err != nil {
f.Close()
return nil, err
}
return &LockedFile{f}, nil
}
24 changes: 19 additions & 5 deletions raftexample/httpapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@ import (
"go.etcd.io/raft/v3/raftpb"
)

// KVStore is the interface to the key-value store that is required by
// `httpKVAPI`.
type KVStore interface {
// Propose proposes to set key `k` to value `v` in the key-value
// store. It returns immediately, before the change has
// necessarily taken effect.
Propose(k, v string)

// Lookup looks up the value for key `k` in the current state of
// the store.
Lookup(k string) (string, bool)
}

// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store *kvstore
store KVStore
confChangeC chan<- raftpb.ConfChange
}

Expand Down Expand Up @@ -101,7 +114,9 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// serveHTTPKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHTTPKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
func serveHTTPKVAPI(
kv KVStore, port int, confChangeC chan<- raftpb.ConfChange, done <-chan struct{},
) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
Expand All @@ -116,7 +131,6 @@ func serveHTTPKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange,
}()

// exit when raft goes down
if err, ok := <-errorC; ok {
log.Fatal(err)
}
<-done
_ = srv.Close()
}
121 changes: 52 additions & 69 deletions raftexample/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,36 @@ import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"log"
"strings"
"sync"

"go.etcd.io/raft/v3/raftexample/snap"
"go.etcd.io/raft/v3/raftpb"
)

// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
}

type kv struct {
Key string
Val string
}

func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
// newKVStore creates and returns a new `kvstore`. The second return
// value can be used as the finite state machine that is driven by a
// `raftNode`.
func newKVStore(proposeC chan<- string) (*kvstore, kvfsm) {
s := &kvstore{
proposeC: proposeC,
kvStore: make(map[string]string),
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
fsm := kvfsm{
kvs: s,
}
// read commits from raft into kvStore map until error
go s.readCommits(commitC, errorC)
return s
return s, fsm
}

func (s *kvstore) Lookup(key string) (string, bool) {
Expand All @@ -71,64 +65,53 @@ func (s *kvstore) Propose(k string, v string) {
s.proposeC <- buf.String()
}

func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
continue
}

for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
close(commit.applyDoneC)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
// Set sets a single value. It should only be called by `kvfsm`.
func (s *kvstore) set(k, v string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore[k] = v
}

func (s *kvstore) getSnapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return json.Marshal(s.kvStore)
func (s *kvstore) restoreFromSnapshot(store map[string]string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
}

func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) {
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return nil, nil
}
if err != nil {
return nil, err
}
return snapshot, nil
// kvfsm implements the `FSM` interface for the underlying `*kvstore`.
type kvfsm struct {
kvs *kvstore
}

func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
// RestoreSnapshot restores the current state of the KV store to the
// value encoded in `snapshot`.
func (fsm kvfsm) RestoreSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
fsm.kvs.restoreFromSnapshot(store)
return nil
}

func (fsm kvfsm) TakeSnapshot() ([]byte, error) {
fsm.kvs.mu.RLock()
defer fsm.kvs.mu.RUnlock()
return json.Marshal(fsm.kvs.kvStore)
}

// ApplyCommits decodes and applies each of the commits in `commit` to
// the current state, then signals that it is done by closing
// `commit.applyDoneC`.
func (fsm kvfsm) ApplyCommits(commit *commit) error {
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
return fmt.Errorf("could not decode message: %w", err)
}
fsm.kvs.set(dataKv.Key, dataKv.Val)
}
close(commit.applyDoneC)
return nil
}
7 changes: 5 additions & 2 deletions raftexample/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ import (
func Test_kvstore_snapshot(t *testing.T) {
tm := map[string]string{"foo": "bar"}
s := &kvstore{kvStore: tm}
fsm := kvfsm{
kvs: s,
}

v, _ := s.Lookup("foo")
if v != "bar" {
t.Fatalf("foo has unexpected value, got %s", v)
}

data, err := s.getSnapshot()
data, err := fsm.TakeSnapshot()
if err != nil {
t.Fatal(err)
}
s.kvStore = nil

if err := s.recoverFromSnapshot(data); err != nil {
if err := fsm.RestoreSnapshot(data); err != nil {
t.Fatal(err)
}
v, _ = s.Lookup("foo")
Expand Down
Loading