-
Notifications
You must be signed in to change notification settings - Fork 113
/
cluster_topo.go
249 lines (213 loc) · 5.92 KB
/
cluster_topo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package radix
import (
"bufio"
"fmt"
"io"
"net"
"sort"
"strconv"
"github.com/mediocregopher/radix/v3/resp"
"github.com/mediocregopher/radix/v3/resp/resp2"
)
// ClusterNode describes a single node in the cluster at a moment in time.
type ClusterNode struct {
// older versions of redis might not actually send back the id, so it may be
// blank
Addr, ID string
// start is inclusive, end is exclusive
Slots [][2]uint16
// address and id this node is the secondary of, if it's a secondary
SecondaryOfAddr, SecondaryOfID string
}
// ClusterTopo describes the cluster topology at a given moment. It will be
// sorted first by slot number of each node and then by secondary status, so
// primaries will come before secondaries.
type ClusterTopo []ClusterNode
// MarshalRESP implements the resp.Marshaler interface, and will marshal the
// ClusterTopo in the same format as the return from CLUSTER SLOTS.
func (tt ClusterTopo) MarshalRESP(w io.Writer) error {
m := map[[2]uint16]topoSlotSet{}
for _, t := range tt {
for _, slots := range t.Slots {
tss := m[slots]
tss.slots = slots
tss.nodes = append(tss.nodes, t)
m[slots] = tss
}
}
// we sort the topoSlotSets by their slot number so that the order is
// deterministic, mostly so tests pass consistently, I'm not sure if actual
// redis has any contract on the order
allTSS := make([]topoSlotSet, 0, len(m))
for _, tss := range m {
allTSS = append(allTSS, tss)
}
sort.Slice(allTSS, func(i, j int) bool {
return allTSS[i].slots[0] < allTSS[j].slots[0]
})
if err := (resp2.ArrayHeader{N: len(allTSS)}).MarshalRESP(w); err != nil {
return err
}
for _, tss := range allTSS {
if err := tss.MarshalRESP(w); err != nil {
return err
}
}
return nil
}
// UnmarshalRESP implements the resp.Unmarshaler interface, but only supports
// unmarshaling the return from CLUSTER SLOTS. The unmarshaled nodes will be
// sorted before they are returned.
func (tt *ClusterTopo) UnmarshalRESP(br *bufio.Reader) error {
var arrHead resp2.ArrayHeader
if err := arrHead.UnmarshalRESP(br); err != nil {
return err
}
slotSets := make([]topoSlotSet, arrHead.N)
for i := range slotSets {
if err := (&(slotSets[i])).UnmarshalRESP(br); err != nil {
return err
}
}
nodeAddrM := map[string]ClusterNode{}
for _, tss := range slotSets {
for _, n := range tss.nodes {
if existingN, ok := nodeAddrM[n.Addr]; ok {
existingN.Slots = append(existingN.Slots, n.Slots...)
nodeAddrM[n.Addr] = existingN
} else {
nodeAddrM[n.Addr] = n
}
}
}
for _, n := range nodeAddrM {
*tt = append(*tt, n)
}
tt.sort()
return nil
}
func (tt ClusterTopo) sort() {
// first go through each node and make sure the individual slot sets are
// sorted
for _, node := range tt {
sort.Slice(node.Slots, func(i, j int) bool {
return node.Slots[i][0] < node.Slots[j][0]
})
}
sort.Slice(tt, func(i, j int) bool {
if tt[i].Slots[0] != tt[j].Slots[0] {
return tt[i].Slots[0][0] < tt[j].Slots[0][0]
}
// we want secondaries to come after, which actually means they should
// be sorted as greater
return tt[i].SecondaryOfAddr == ""
})
}
// Map returns the topology as a mapping of node address to its ClusterNode.
func (tt ClusterTopo) Map() map[string]ClusterNode {
m := make(map[string]ClusterNode, len(tt))
for _, t := range tt {
m[t.Addr] = t
}
return m
}
// Primaries returns a ClusterTopo instance containing only the primary nodes
// from the ClusterTopo being called on.
func (tt ClusterTopo) Primaries() ClusterTopo {
mtt := make(ClusterTopo, 0, len(tt))
for _, node := range tt {
if node.SecondaryOfAddr == "" {
mtt = append(mtt, node)
}
}
return mtt
}
// we only use this type during unmarshalling, the topo Unmarshal method will
// convert these into ClusterNodes.
type topoSlotSet struct {
slots [2]uint16
nodes []ClusterNode
}
func (tss topoSlotSet) MarshalRESP(w io.Writer) error {
var err error
marshal := func(m resp.Marshaler) {
if err == nil {
err = m.MarshalRESP(w)
}
}
marshal(resp2.ArrayHeader{N: 2 + len(tss.nodes)})
marshal(resp2.Any{I: tss.slots[0]})
marshal(resp2.Any{I: tss.slots[1] - 1})
for _, n := range tss.nodes {
host, portStr, _ := net.SplitHostPort(n.Addr)
port, err := strconv.Atoi(portStr)
if err != nil {
return err
}
node := []interface{}{host, port}
if n.ID != "" {
node = append(node, n.ID)
}
marshal(resp2.Any{I: node})
}
return err
}
func (tss *topoSlotSet) UnmarshalRESP(br *bufio.Reader) error {
var arrHead resp2.ArrayHeader
if err := arrHead.UnmarshalRESP(br); err != nil {
return err
}
// first two array elements are the slot numbers. We increment the second to
// preserve inclusive start/exclusive end, which redis doesn't
for i := range tss.slots {
if err := (resp2.Any{I: &tss.slots[i]}).UnmarshalRESP(br); err != nil {
return err
}
}
tss.slots[1]++
arrHead.N -= len(tss.slots)
var primaryNode ClusterNode
for i := 0; i < arrHead.N; i++ {
var nodeArrHead resp2.ArrayHeader
if err := nodeArrHead.UnmarshalRESP(br); err != nil {
return err
} else if nodeArrHead.N < 2 {
return fmt.Errorf("expected at least 2 array elements, got %d", nodeArrHead.N)
}
var ip resp2.BulkString
if err := ip.UnmarshalRESP(br); err != nil {
return err
}
var port resp2.Int
if err := port.UnmarshalRESP(br); err != nil {
return err
}
nodeArrHead.N -= 2
var id resp2.BulkString
if nodeArrHead.N > 0 {
if err := id.UnmarshalRESP(br); err != nil {
return err
}
nodeArrHead.N--
}
// discard anything after
for i := 0; i < nodeArrHead.N; i++ {
if err := (resp2.Any{}).UnmarshalRESP(br); err != nil {
return err
}
}
node := ClusterNode{
Addr: net.JoinHostPort(ip.S, strconv.FormatInt(port.I, 10)),
ID: id.S,
Slots: [][2]uint16{tss.slots},
}
if i == 0 {
primaryNode = node
} else {
node.SecondaryOfAddr = primaryNode.Addr
node.SecondaryOfID = primaryNode.ID
}
tss.nodes = append(tss.nodes, node)
}
return nil
}