Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #997 from dpw/eliminate-starts
Browse files Browse the repository at this point in the history
Eliminate most Start methods in router package
  • Loading branch information
rade committed Jun 24, 2015
2 parents 9a6afcd + 7387caa commit 76a12ed
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 83 deletions.
21 changes: 9 additions & 12 deletions router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,23 @@ func (conn *RemoteConnection) String() string {
return fmt.Sprint("Connection ", from, "->", to)
}

func NewLocalConnection(connRemote *RemoteConnection, tcpConn *net.TCPConn, udpAddr *net.UDPAddr, router *Router) *LocalConnection {
// Does not return anything. If the connection is successful, it will
// end up in the local peer's connections map.
func StartLocalConnection(connRemote *RemoteConnection, tcpConn *net.TCPConn, udpAddr *net.UDPAddr, router *Router, acceptNewPeer bool) {
if connRemote.local != router.Ourself.Peer {
log.Fatal("Attempt to create local connection from a peer which is not ourself")
}
// NB, we're taking a copy of connRemote here.
return &LocalConnection{
actionChan := make(chan ConnectionAction, ChannelSize)
finished := make(chan struct{})
conn := &LocalConnection{
RemoteConnection: *connRemote,
Router: router,
TCPConn: tcpConn,
remoteUDPAddr: udpAddr,
effectivePMTU: DefaultPMTU}
}

// Async. Does not return anything. If the connection is successful,
// it will end up in the local peer's connections map.
func (conn *LocalConnection) Start(acceptNewPeer bool) {
actionChan := make(chan ConnectionAction, ChannelSize)
conn.actionChan = actionChan
finished := make(chan struct{})
conn.finished = finished
effectivePMTU: DefaultPMTU,
actionChan: actionChan,
finished: finished}
go conn.run(actionChan, finished, acceptNewPeer)
}

Expand Down
12 changes: 5 additions & 7 deletions router/connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,17 @@ type Target struct {
type ConnectionMakerAction func() bool

func NewConnectionMaker(ourself *LocalPeer, peers *Peers, port int, discovery bool) *ConnectionMaker {
return &ConnectionMaker{
actionChan := make(chan ConnectionMakerAction, ChannelSize)
cm := &ConnectionMaker{
ourself: ourself,
peers: peers,
port: port,
discovery: discovery,
directPeers: peerAddrs{},
targets: make(map[string]*Target)}
}

func (cm *ConnectionMaker) Start() {
actionChan := make(chan ConnectionMakerAction, ChannelSize)
cm.actionChan = actionChan
targets: make(map[string]*Target),
actionChan: actionChan}
go cm.queryLoop(actionChan)
return cm
}

func (cm *ConnectionMaker) InitiateConnections(peers []string, replace bool) []error {
Expand Down
38 changes: 15 additions & 23 deletions router/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ func (conn *LocalConnection) ensureForwarders() error {

forwarder := NewForwarder(conn, encryptor, udpSender, DefaultPMTU)
forwarderDF := NewForwarderDF(conn, encryptorDF, udpSenderDF, DefaultPMTU)
effectivePMTU := forwarderDF.unverifiedPMTU
forwarder.Start()
forwarderDF.Start()

// Various fields in the conn struct are read by other processes,
// so we have to use locks.
conn.Lock()
conn.forwarder = forwarder
conn.forwarderDF = forwarderDF
conn.effectivePMTU = effectivePMTU
conn.Unlock()

return nil
Expand Down Expand Up @@ -195,20 +191,18 @@ type Forwarder struct {
}

func NewForwarder(conn *LocalConnection, enc Encryptor, udpSender UDPSender, pmtu int) *Forwarder {
return &Forwarder{
ch := make(chan *ForwardedFrame, ChannelSize)
finished := make(chan struct{})
fwd := &Forwarder{
conn: conn,
ch: ch,
finished: finished,
enc: enc,
udpSender: udpSender,
maxPayload: pmtu - UDPOverhead,
processSendError: func(err error) error { return err }}
}

func (fwd *Forwarder) Start() {
ch := make(chan *ForwardedFrame, ChannelSize)
fwd.ch = ch
finished := make(chan struct{})
fwd.finished = finished
go fwd.run(ch, finished)
return fwd
}

func (fwd *Forwarder) Shutdown() {
Expand Down Expand Up @@ -310,25 +304,23 @@ type ForwarderDF struct {
}

func NewForwarderDF(conn *LocalConnection, enc Encryptor, udpSender UDPSender, pmtu int) *ForwarderDF {
ch := make(chan *ForwardedFrame, ChannelSize)
finished := make(chan struct{})
verifyPMTU := make(chan int, ChannelSize)
fwd := &ForwarderDF{
Forwarder: Forwarder{
conn: conn,
ch: ch,
finished: finished,
enc: enc,
udpSender: udpSender,
maxPayload: pmtu - UDPOverhead}}
maxPayload: pmtu - UDPOverhead},
verifyPMTU: verifyPMTU}
fwd.Forwarder.processSendError = fwd.processSendError
fwd.unverifiedPMTU = pmtu - fwd.effectiveOverhead()
return fwd
}

func (fwd *ForwarderDF) Start() {
ch := make(chan *ForwardedFrame, ChannelSize)
fwd.ch = ch
finished := make(chan struct{})
fwd.finished = finished
verifyPMTU := make(chan int, ChannelSize)
fwd.verifyPMTU = verifyPMTU
conn.setEffectivePMTU(fwd.unverifiedPMTU)
go fwd.run(ch, finished, verifyPMTU)
return fwd
}

func (fwd *ForwarderDF) PMTUVerified(pmtu int) {
Expand Down
14 changes: 8 additions & 6 deletions router/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ type GossipSender struct {
}

func NewGossipSender(send func(GossipData)) *GossipSender {
return &GossipSender{send: send}
}

func (sender *GossipSender) Start() {
sender.cell = make(chan GossipData, 1)
sender.flushch = make(chan chan bool)
cell := make(chan GossipData, 1)
flushch := make(chan chan bool)
sender := &GossipSender{
send: send,
cell: cell,
flushch: flushch,
}
go sender.run()
return sender
}

func (sender *GossipSender) run() {
Expand Down
2 changes: 0 additions & 2 deletions router/gossip_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (c *GossipChannel) sendDown(conn Connection, data GossipData) {
}
})
c.senders[conn] = sender
sender.Start()
}
sender.Send(data)
}
Expand Down Expand Up @@ -198,7 +197,6 @@ func (c *GossipChannel) relayBroadcast(srcName PeerName, update GossipData) erro
if !found {
broadcaster = NewGossipSender(func(pending GossipData) { c.sendBroadcast(srcName, pending) })
c.broadcasters[srcName] = broadcaster
broadcaster.Start()
}
broadcaster.Send(update)
return nil
Expand Down
9 changes: 2 additions & 7 deletions router/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ type mockChannelConnection struct {
dest *Router
}

// Construct a "passive" Router, i.e. without any goroutines, except
// for Routes and GossipSenders.
func NewTestRouter(name string) *Router {
peerName, _ := PeerNameFromString(name)
router := NewRouter(Config{}, peerName, "")
// need to create a dummy channel otherwise tests hang on nil
// channels when the Router invoked ConnectionMaker.Refresh
router.ConnectionMaker.actionChan = make(chan ConnectionMakerAction, ChannelSize)
router.Routes.Start()
router := NewRouter(Config{}, peerName, "nick")
router.Start()
return router
}

Expand Down
14 changes: 7 additions & 7 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ type LocalPeer struct {
type LocalPeerAction func()

func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer {
return &LocalPeer{Peer: NewPeer(name, nickName, 0, 0), router: router}
}

func (peer *LocalPeer) Start() {
actionChan := make(chan LocalPeerAction, ChannelSize)
peer.actionChan = actionChan
peer := &LocalPeer{
Peer: NewPeer(name, nickName, 0, 0),
router: router,
actionChan: actionChan,
}
go peer.actorLoop(actionChan)
return peer
}

func (peer *LocalPeer) Forward(dstPeer *Peer, df bool, frame []byte, dec *EthernetDecoder) error {
Expand Down Expand Up @@ -132,8 +133,7 @@ func (peer *LocalPeer) CreateConnection(peerAddr string, acceptNewPeer bool) err
return err
}
connRemote := NewRemoteConnection(peer.Peer, nil, tcpConn.RemoteAddr().String(), true, false)
connLocal := NewLocalConnection(connRemote, tcpConn, udpAddr, peer.router)
connLocal.Start(acceptNewPeer)
StartLocalConnection(connRemote, tcpConn, udpAddr, peer.router, acceptNewPeer)
return nil
}

Expand Down
6 changes: 2 additions & 4 deletions router/mac_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ type MacCache struct {
}

func NewMacCache(maxAge time.Duration, onExpiry func(net.HardwareAddr, *Peer)) *MacCache {
return &MacCache{
cache := &MacCache{
table: make(map[uint64]*MacCacheEntry),
maxAge: maxAge,
onExpiry: onExpiry}
}

func (cache *MacCache) Start() {
cache.setExpiryTimer()
return cache
}

func (cache *MacCache) Enter(mac net.HardwareAddr, peer *Peer) bool {
Expand Down
10 changes: 4 additions & 6 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewRouter(config Config, name PeerName, nickName string) *Router {
return router
}

// Start listening for TCP connections, locally captured packets, and
// packets forwarded over UDP. This is separate from NewRouter so
// that gossipers can register before we start forming connections.
func (router *Router) Start() {
// we need two pcap handles since they aren't thread-safe
var pio PacketSourceSink
Expand All @@ -95,10 +98,6 @@ func (router *Router) Start() {
po, err = NewPcapO(router.Iface.Name)
checkFatal(err)
}
router.Ourself.Start()
router.Macs.Start()
router.Routes.Start()
router.ConnectionMaker.Start()
router.UDPListener = router.listenUDP(router.Port, po)
router.listenTCP(router.Port)
if pio != nil {
Expand Down Expand Up @@ -224,8 +223,7 @@ func (router *Router) acceptTCP(tcpConn *net.TCPConn) {
remoteAddrStr := tcpConn.RemoteAddr().String()
log.Printf("->[%s] connection accepted\n", remoteAddrStr)
connRemote := NewRemoteConnection(router.Ourself.Peer, nil, remoteAddrStr, false, false)
connLocal := NewLocalConnection(connRemote, tcpConn, nil, router)
connLocal.Start(true)
StartLocalConnection(connRemote, tcpConn, nil, router, true)
}

func (router *Router) listenUDP(localPort int, po PacketSink) *net.UDPConn {
Expand Down
15 changes: 6 additions & 9 deletions router/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@ type Routes struct {
}

func NewRoutes(ourself *LocalPeer, peers *Peers) *Routes {
recalculate := make(chan *struct{}, 1)
wait := make(chan chan struct{})
routes := &Routes{
ourself: ourself,
peers: peers,
unicast: make(map[PeerName]PeerName),
unicastAll: make(map[PeerName]PeerName),
broadcast: make(map[PeerName][]PeerName),
broadcastAll: make(map[PeerName][]PeerName)}
broadcastAll: make(map[PeerName][]PeerName),
recalculate: recalculate,
wait: wait}
routes.unicast[ourself.Name] = UnknownPeerName
routes.unicastAll[ourself.Name] = UnknownPeerName
routes.broadcast[ourself.Name] = []PeerName{}
routes.broadcastAll[ourself.Name] = []PeerName{}
return routes
}

func (routes *Routes) Start() {
recalculate := make(chan *struct{}, 1)
wait := make(chan chan struct{})
routes.recalculate = recalculate
routes.wait = wait
go routes.run(recalculate, wait)
return routes
}

func (routes *Routes) PeerNames() PeerNameSet {
Expand Down

0 comments on commit 76a12ed

Please sign in to comment.