-
Notifications
You must be signed in to change notification settings - Fork 1
/
mutex.go
110 lines (94 loc) · 2.23 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package etcdplus
import (
"errors"
// "fmt"
"github.com/coreos/go-etcd/etcd"
"time"
)
const (
UNLOCK_VALUE = "unlocked"
)
type Mutex struct {
// key is a well-known string that nodes can use to
// access the mutex.
// lockVal is specific to a node. When a node locks
// a mutex, the value is set to lockVal; since lockVal
// is specific to that node, other nodes won't be able
// to unlock the mutex.
client *etcd.Client
key string
lockVal string
}
func NewMutex(client *etcd.Client, key string) *Mutex {
// Generate a random key if no key is given.
// The user should inspect the returned mutex
// and tell other nodes about the key.
if key == "" {
key = getUUID()
}
mutex := Mutex{
client: client,
key: key,
lockVal: getUUID(),
}
return &mutex
}
func (m *Mutex) Lock(timeout time.Duration) error {
timeoutChan := getTimeoutChan(timeout)
successChan := make(chan bool)
// Stop any outstanding watches at the end
stopChan := make(chan bool, 1)
defer func() {
stopChan <- true
}()
var lockHelper func()
lockHelper = func() {
resps, err := m.client.Get(m.key)
if err != nil {
etcdErr := err.(etcd.EtcdError)
if etcdErr.ErrorCode == 100 {
// Create the mutex.
// A mutex might be missing for two reasons:
// 1. This is the first time anyone ever tries to
// lock the mutex.
// 2. Some node died when holding the mutex.
m.client.TestAndSet(m.key, "", UNLOCK_VALUE, 0)
// Restart
lockHelper()
return
}
panic(err)
}
resp := resps[0]
for {
sinceIndex := resp.Index + 1
// Try to lock if it's unlocked
if resp.Value == UNLOCK_VALUE {
_, success, _ := m.client.TestAndSet(m.key,
UNLOCK_VALUE, m.lockVal, 0)
if success {
successChan <- true
return
}
}
// Wait till it's changed
resp, err = m.client.Watch(m.key, sinceIndex, nil, stopChan)
if err != nil {
// The reason is likely that stopChan was used
return
}
}
}
go lockHelper()
select {
case <-timeoutChan:
return errors.New("Timeout")
case <-successChan:
return nil
}
}
func (m *Mutex) Unlock() error {
_, _, err := m.client.TestAndSet(m.key, m.lockVal, UNLOCK_VALUE, 0)
// TODO: should we keep trying in case err is not nil?
return err
}