-
Notifications
You must be signed in to change notification settings - Fork 22
/
raft_test.go
372 lines (327 loc) · 10.3 KB
/
raft_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package libp2praft
import (
"context"
"fmt"
"io"
"log"
"testing"
"time"
"github.com/hashicorp/raft"
"github.com/libp2p/go-libp2p"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
var raftTmpFolder = "testing_tmp"
var raftQuiet = true
type raftState struct {
Msg string
}
type testOperation struct {
Append string
}
func (o testOperation) ApplyTo(s consensus.State) (consensus.State, error) {
raftSt := s.(*raftState)
return &raftState{Msg: raftSt.Msg + o.Append}, nil
}
// wait 10 seconds for a leader.
func waitForLeader(t *testing.T, r *raft.Raft) {
obsCh := make(chan raft.Observation, 1)
observer := raft.NewObserver(obsCh, false, nil)
r.RegisterObserver(observer)
defer r.DeregisterObserver(observer)
// New Raft does not allow leader observation directy
// What's worse, there will be no notification that a new
// leader was elected because observations are set before
// setting the Leader and only when the RaftState has changed.
// Therefore, we need a ticker.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(time.Second / 2)
defer ticker.Stop()
for {
select {
case obs := <-obsCh:
switch obs.Data.(type) {
case raft.RaftState:
if leaderAddr, _ := r.LeaderWithID(); leaderAddr != "" {
return
}
}
case <-ticker.C:
if leaderAddr, _ := r.LeaderWithID(); leaderAddr != "" {
return
}
case <-ctx.Done():
t.Fatal("timed out waiting for Leader")
}
}
}
func shutdown(t *testing.T, r *raft.Raft) {
err := r.Shutdown().Error()
if err != nil {
t.Fatal(err)
}
}
// Create a quick raft instance
func makeTestingRaft(t *testing.T, h host.Host, pids []peer.ID, op consensus.Op) (*raft.Raft, *Consensus, *raft.NetworkTransport) {
// -- Create the consensus with no actor attached
var consensus *Consensus
if op != nil {
consensus = NewOpLog(&raftState{}, op)
} else {
consensus = NewConsensus(&raftState{"i am not consensuated"})
}
// --
// -- Create Raft servers configuration
servers := make([]raft.Server, len(pids))
for i, pid := range pids {
servers[i] = raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(pid.String()),
Address: raft.ServerAddress(pid.String()),
}
}
serverConfig := raft.Configuration{
Servers: servers,
}
// --
// -- Create LibP2P transports Raft
transport, err := NewLibp2pTransport(h, 2*time.Second)
if err != nil {
t.Fatal(err)
}
// --
// -- Configuration
config := raft.DefaultConfig()
if raftQuiet {
config.LogOutput = io.Discard
config.Logger = nil
}
config.LocalID = raft.ServerID(h.ID().String())
// --
// -- SnapshotStore
snapshots, err := raft.NewFileSnapshotStore(raftTmpFolder, 3, nil)
if err != nil {
t.Fatal(err)
}
// -- Log store and stable store: we use inmem.
logStore := raft.NewInmemStore()
// --
// -- Boostrap everything if necessary
bootstrapped, err := raft.HasExistingState(logStore, logStore, snapshots)
if err != nil {
t.Fatal(err)
}
if !bootstrapped {
// Bootstrap cluster first
raft.BootstrapCluster(config, logStore, logStore, snapshots, transport, serverConfig)
} else {
t.Log("Already initialized!!")
}
// --
// Create Raft instance. Our consensus.FSM() provides raft.FSM
// implementation
raft, err := raft.NewRaft(config, consensus.FSM(), logStore, logStore, snapshots, transport)
if err != nil {
t.Fatal(err)
}
return raft, consensus, transport
}
func Example_consensus() {
// This example shows how to use go-libp2p-raft to create a cluster
// which agrees on a State. In order to do it, it defines a state,
// creates three Raft nodes and launches them. We call a function which
// lets the cluster leader repeteadly update the state. At the
// end of the execution we verify that all members have agreed on the
// same state.
//
// Some error handling has been excluded for simplicity
// Declare an object which represents the State.
// Note that State objects should have public/exported fields,
// as they are [de]serialized.
type raftState struct {
Value int
}
// error handling ommitted
newPeer := func(listenPort int) host.Host {
h, _ := libp2p.New(
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
)
return h
}
// Create peers and make sure they know about each others.
peer1 := newPeer(9997)
peer2 := newPeer(9998)
peer3 := newPeer(9999)
defer peer1.Close()
defer peer2.Close()
defer peer3.Close()
peer1.Peerstore().AddAddrs(peer2.ID(), peer2.Addrs(), peerstore.PermanentAddrTTL)
peer1.Peerstore().AddAddrs(peer3.ID(), peer3.Addrs(), peerstore.PermanentAddrTTL)
peer2.Peerstore().AddAddrs(peer1.ID(), peer1.Addrs(), peerstore.PermanentAddrTTL)
peer2.Peerstore().AddAddrs(peer3.ID(), peer3.Addrs(), peerstore.PermanentAddrTTL)
peer3.Peerstore().AddAddrs(peer1.ID(), peer1.Addrs(), peerstore.PermanentAddrTTL)
peer3.Peerstore().AddAddrs(peer2.ID(), peer2.Addrs(), peerstore.PermanentAddrTTL)
// Create the consensus instances and initialize them with a state.
// Note that state is just used for local initialization, and that,
// only states submitted via CommitState() alters the state of the
// cluster.
consensus1 := NewConsensus(&raftState{3})
consensus2 := NewConsensus(&raftState{3})
consensus3 := NewConsensus(&raftState{3})
// Create LibP2P transports Raft
transport1, err := NewLibp2pTransport(peer1, time.Minute)
if err != nil {
fmt.Println(err)
return
}
transport2, err := NewLibp2pTransport(peer2, time.Minute)
if err != nil {
fmt.Println(err)
return
}
transport3, err := NewLibp2pTransport(peer3, time.Minute)
if err != nil {
fmt.Println(err)
return
}
defer transport1.Close()
defer transport2.Close()
defer transport3.Close()
// Create Raft servers configuration for bootstrapping the cluster
// Note that both IDs and Address are set to the Peer ID.
servers := make([]raft.Server, 0)
for _, h := range []host.Host{peer1, peer2, peer3} {
servers = append(servers, raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(h.ID().String()),
Address: raft.ServerAddress(h.ID().String()),
})
}
serversCfg := raft.Configuration{Servers: servers}
// Create Raft Configs. The Local ID is the PeerOID
config1 := raft.DefaultConfig()
config1.LogOutput = io.Discard
config1.Logger = nil
config1.LocalID = raft.ServerID(peer1.ID().String())
config2 := raft.DefaultConfig()
config2.LogOutput = io.Discard
config2.Logger = nil
config2.LocalID = raft.ServerID(peer2.ID().String())
config3 := raft.DefaultConfig()
config3.LogOutput = io.Discard
config3.Logger = nil
config3.LocalID = raft.ServerID(peer3.ID().String())
// Create snapshotStores. Use FileSnapshotStore in production.
snapshots1 := raft.NewInmemSnapshotStore()
snapshots2 := raft.NewInmemSnapshotStore()
snapshots3 := raft.NewInmemSnapshotStore()
// Create the InmemStores for use as log store and stable store.
logStore1 := raft.NewInmemStore()
logStore2 := raft.NewInmemStore()
logStore3 := raft.NewInmemStore()
// Bootsrap the stores with the serverConfigs
raft.BootstrapCluster(config1, logStore1, logStore1, snapshots1, transport1, serversCfg.Clone())
raft.BootstrapCluster(config2, logStore2, logStore2, snapshots2, transport2, serversCfg.Clone())
raft.BootstrapCluster(config3, logStore3, logStore3, snapshots3, transport3, serversCfg.Clone())
// Create Raft objects. Our consensus provides an implementation of
// Raft.FSM
raft1, err := raft.NewRaft(config1, consensus1.FSM(), logStore1, logStore1, snapshots1, transport1)
if err != nil {
log.Fatal(err)
}
raft2, err := raft.NewRaft(config2, consensus2.FSM(), logStore2, logStore2, snapshots2, transport2)
if err != nil {
log.Fatal(err)
}
raft3, err := raft.NewRaft(config3, consensus3.FSM(), logStore3, logStore3, snapshots3, transport3)
if err != nil {
log.Fatal(err)
}
// Create the actors using the Raft nodes
actor1 := NewActor(raft1)
actor2 := NewActor(raft2)
actor3 := NewActor(raft3)
// Set the actors so that we can CommitState() and GetCurrentState()
consensus1.SetActor(actor1)
consensus2.SetActor(actor2)
consensus3.SetActor(actor3)
// This function updates the cluster state commiting 1000 updates.
updateState := func(c *Consensus) {
nUpdates := 0
for {
if nUpdates >= 1000 {
break
}
newState := &raftState{nUpdates * 2}
// CommitState() blocks until the state has been
// agreed upon by everyone
agreedState, err := c.CommitState(newState)
if err != nil {
fmt.Println(err)
continue
}
if agreedState == nil {
fmt.Println("agreedState is nil: commited on a non-leader?")
continue
}
agreedRaftState := agreedState.(*raftState)
nUpdates++
if nUpdates%200 == 0 {
fmt.Printf("Performed %d updates. Current state value: %d\n",
nUpdates, agreedRaftState.Value)
}
}
}
// Provide some time for leader election
time.Sleep(5 * time.Second)
// Run the 1000 updates on the leader
// Barrier() will wait until updates have been applied
if actor1.IsLeader() {
updateState(consensus1)
} else if actor2.IsLeader() {
updateState(consensus2)
} else if actor3.IsLeader() {
updateState(consensus3)
}
// Wait for updates to arrive.
time.Sleep(5 * time.Second)
// Shutdown raft and wait for it to complete
// (ignoring errors)
raft1.Shutdown().Error()
raft2.Shutdown().Error()
raft3.Shutdown().Error()
// Final states
finalState1, err := consensus1.GetCurrentState()
if err != nil {
fmt.Println(err)
return
}
finalState2, err := consensus2.GetCurrentState()
if err != nil {
fmt.Println(err)
return
}
finalState3, err := consensus3.GetCurrentState()
if err != nil {
fmt.Println(err)
return
}
finalRaftState1 := finalState1.(*raftState)
finalRaftState2 := finalState2.(*raftState)
finalRaftState3 := finalState3.(*raftState)
fmt.Printf("Raft1 final state: %d\n", finalRaftState1.Value)
fmt.Printf("Raft2 final state: %d\n", finalRaftState2.Value)
fmt.Printf("Raft3 final state: %d\n", finalRaftState3.Value)
// Output:
// Performed 200 updates. Current state value: 398
// Performed 400 updates. Current state value: 798
// Performed 600 updates. Current state value: 1198
// Performed 800 updates. Current state value: 1598
// Performed 1000 updates. Current state value: 1998
// Raft1 final state: 1998
// Raft2 final state: 1998
// Raft3 final state: 1998
}