-
Notifications
You must be signed in to change notification settings - Fork 3
/
v2.go
169 lines (161 loc) · 4.62 KB
/
v2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package main
import (
"time"
"github.com/coreos/etcd/client"
"log"
"golang.org/x/net/context"
"fmt"
"path/filepath"
"bytes"
"strings"
"os"
"io/ioutil"
)
/*
Monitoring changes in etcd server.
It designed for run in separate goroutine.
*/
func etcdMon_v2(etcdRootPath string, config client.Config, bus chan fileChangeEvent, startIndex uint64) {
c, err := client.New(config)
if err != nil {
panic(err)
}
kapi := client.NewKeysAPI(c)
var nextEvent uint64 = startIndex
for {
response, err := kapi.Watcher(etcdRootPath, &client.WatcherOptions{AfterIndex: nextEvent, Recursive: true}).Next(context.Background())
if err != nil {
log.Println(err)
time.Sleep(time.Second)
continue
}
nextEvent = response.Index
if response.Action == "delete" {
bus <- fileChangeEvent{Path: response.Node.Key, IsRemoved: true, IsDir: response.Node.Dir}
continue
}
if response.Node.Dir {
bus <- fileChangeEvent{Path: response.Node.Key, IsDir: response.Node.Dir}
continue
}
bus <- fileChangeEvent{Path: response.Node.Key, Content: []byte(response.Node.Value)}
}
}
/*
Clear dir and dump content of etcd to the dir.
ATTENTION: the function REMOVE ALL CONTENT of the dir.
*/
func firstSyncEtcDir_v2(etcdRootPath string, etcdConfig client.Config, dir string) (etcdIndex uint64) {
cleanDir(dir)
etcdClient, err := client.New(etcdConfig)
if err != nil {
log.Println("Can't create etcdClient: ", err)
panic(err)
}
kapi := client.NewKeysAPI(etcdClient)
response, err := kapi.Get(context.Background(), etcdRootPath, &client.GetOptions{Recursive: true, Quorum: true})
if err != nil {
fmt.Println("I can't get initial etcd state: ", err)
panic(err)
}
writeNodeToDir(dir, etcdRootPath, response.Node)
etcdIndex = response.Index
return
}
/*
function for replicate changes between etcd and file system.
It is never returned function.
It can be run in separate goroutine or call it as last function in main()
*/
func syncProcess_v2(dir, etcdRootDir string, etcdConfig client.Config, etcdChan, fsChan <-chan fileChangeEvent) {
etcdClient, err := client.New(etcdConfig)
if err != nil {
panic(err)
}
kapi := client.NewKeysAPI(etcdClient)
ctx := context.Background()
fsMarkFile := filepath.Join(dir, MARK_FILE_NAME)
for {
var event fileChangeEvent
select {
case event = <-etcdChan:
fsPath := filepath.Join(dir, event.Path)
if fsPath == fsMarkFile {
continue
}
switch {
case event.IsRemoved:
err := os.RemoveAll(fsPath)
if err != nil {
log.Println("Can't remove: ", fsPath, err)
}
case event.IsDir:
err := os.Mkdir(fsPath, DEFAULT_DIRMODE)
if err != nil && !os.IsExist(err){
log.Println("Can't make dir: ", fsPath, err)
}
default:
if content, err := ioutil.ReadFile(fsPath); err == nil {
if bytes.Equal(content, event.Content) {
// Skip if contents are equals
continue
}
}
err := ioutil.WriteFile(fsPath, event.Content, DEFAULT_FILEMODE)
if err != nil {
log.Println("Can't write file: ", fsPath, err)
}
}
case event = <-fsChan:
if event.Path == fsMarkFile {
continue
}
etcdPath := etcdRootDir + event.Path[len(dir):]
etcdPath = strings.Replace(etcdPath, "\\", "/", -1)
switch {
case event.IsRemoved:
_, err := kapi.Delete(ctx, etcdPath, &client.DeleteOptions{Recursive: true})
if err != nil {
log.Println("Can't remove etcd: "+etcdPath, err)
}
case event.IsDir:
_, err := kapi.Set(ctx, etcdPath, "", &client.SetOptions{Dir: true})
if err != nil {
log.Println("Can't create etcd dir: ", etcdPath, err)
}
default:
if resp, err := kapi.Get(ctx, etcdPath, &client.GetOptions{Quorum: true}); err == nil {
if bytes.Equal([]byte(resp.Node.Value), event.Content) {
// Skip equal contents
continue
}
}
_, err := kapi.Set(ctx, etcdPath, string(event.Content), nil)
if err != nil {
log.Println("Can't set etcd value: ", etcdPath, err)
}
}
}
}
}
func writeNodeToDir(dir, root string, node *client.Node) {
//log.Println("I can't create dir: ", node.Key," ", dir, "root:", root)
nodePath := filepath.Join(dir, node.Key[len(root)-1:])
//log.Println("I can't create dir: ", nodePath, " ", node.Key," ", dir, " ", root)
if node.Dir {
err := os.Mkdir(nodePath, DEFAULT_DIRMODE)
if err != nil && !os.IsExist(err) {
log.Println("I can't create dir: ", nodePath, " ", node.Key," ", dir, " ", root)
panic(err)
}
for _, item := range node.Nodes {
writeNodeToDir(dir, root, item)
}
} else {
err := ioutil.WriteFile(nodePath, []byte(node.Value), DEFAULT_FILEMODE)
if err != nil {
log.Println("I can't create file: ", nodePath)
panic(err)
}
}
}