-
Notifications
You must be signed in to change notification settings - Fork 17
/
dispatcher.go
78 lines (73 loc) · 1.52 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package kaca
import (
"log"
"strconv"
"strings"
)
type dispatcher struct {
// Registered connections.
connections map[*connection]bool
broadcast chan []byte
sub chan string
pub chan string
register chan *connection
unregister chan *connection
}
func NewDispatcher() *dispatcher {
return &dispatcher{
broadcast: make(chan []byte),
sub: make(chan string),
pub: make(chan string),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
}
}
func (d *dispatcher) run() {
for {
select {
case c := <-d.register:
d.connections[c] = true
case c := <-d.unregister:
if _, ok := d.connections[c]; ok {
delete(d.connections, c)
close(c.send)
}
case m := <-d.broadcast:
for c := range d.connections {
select {
case c.send <- m:
default:
close(c.send)
delete(d.connections, c)
}
}
case m := <-d.sub:
msp := strings.Split(m, SPLIT_LINE)
//subscribe message
log.Println("sub->" + m)
for c := range d.connections {
if msp[0] == strconv.Itoa(int(c.cid)) {
c.topics = append(c.topics, msp[1])
}
}
case m := <-d.pub:
//publish message
msp := strings.Split(m, SPLIT_LINE)
log.Println("pub->" + m)
for c := range d.connections {
for _, t := range c.topics {
if t == msp[0] {
select {
case c.send <- []byte(msp[1]):
default:
close(c.send)
delete(d.connections, c)
}
break
}
}
}
}
}
}