-
Notifications
You must be signed in to change notification settings - Fork 22
/
actor.go
95 lines (81 loc) · 2.68 KB
/
actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package libp2praft
import (
"errors"
"time"
"github.com/hashicorp/raft"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
)
// SetStateTimeout specifies how long before giving up on setting a state
var SetStateTimeout = 1 * time.Second
// Actor implements a consensus.Actor, allowing to SetState
// in a libp2p Consensus system. In order to do this it uses hashicorp/raft
// implementation of the Raft algorithm.
type Actor struct {
Raft *raft.Raft
}
// NewActor returns a new actor given a hashicorp/raft node.
func NewActor(r *raft.Raft) *Actor {
return &Actor{
Raft: r,
}
}
// SetState attempts to set the state of the cluster to the state
// represented by the given Node. It will block until the state is
// commited, and will then return then the new state.
//
// This does not mean that the new state is already available in all
// the nodes in the cluster, but that it will be at some point because
// it is part of the authoritative log.
//
// Only the Raft leader can set the state. Otherwise, an error will
// be returned.
func (actor *Actor) SetState(newState consensus.State) (consensus.State, error) {
// figure out if this is an Op.
op, ok := newState.(consensus.Op)
if ok {
return actor.commitOp(op)
}
return actor.commitOp(&stateOp{newState})
}
// commitOp actually does the job of setting the state, which is simply
// an opConsensus operation with the new state. Everything stated for SetState
// applies here.
func (actor *Actor) commitOp(op consensus.Op) (consensus.State, error) {
//log.Debug("actor is applying state")
if actor.Raft == nil {
return nil, errors.New("this actor does not have a raft instance")
}
if !actor.IsLeader() {
return nil, errors.New("this actor is not the leader")
}
bs, err := encodeOp(op)
if err != nil {
return nil, err
}
applyFuture := actor.Raft.Apply(bs, SetStateTimeout)
// Error blocks until apply future is "considered commited"
// which means "commited to the local FSM"
err = applyFuture.Error()
futureResp := applyFuture.Response()
//log.Debugf("apply future log entry index: %d", applyFuture.Index())
return futureResp, err
}
// IsLeader returns of the current actor is Raft leader
func (actor *Actor) IsLeader() bool {
if actor.Raft != nil {
return actor.Raft.State() == raft.Leader
}
return false
}
// Leader returns the LibP2P ID of the Raft leader or an
// error if there is no leader.
func (actor *Actor) Leader() (peer.ID, error) {
// Leader as returned by Libp2pTransport.LocalAddr()
leaderAddr, _ := actor.Raft.LeaderWithID()
peerID, err := peer.Decode(string(leaderAddr))
if err != nil {
return "", errors.New("leader unknown or not existing yet")
}
return peerID, nil
}