Skip to content

Commit

Permalink
rafthttp: add "IsActive" methods to "Peer"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Mar 1, 2018
1 parent 48ff9e6 commit fe9b909
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
13 changes: 8 additions & 5 deletions etcdserver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
func (s *nopTransporterWithActiveTime) RemoveAllPeers() {}
func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
func (s *nopTransporterWithActiveTime) Stop() {}
func (s *nopTransporterWithActiveTime) Pause() {}
func (s *nopTransporterWithActiveTime) Resume() {}
func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }
func (s *nopTransporterWithActiveTime) IsActive(id types.ID) bool {
return s.activeMap[id] != time.Time{}
}
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
func (s *nopTransporterWithActiveTime) Stop() {}
func (s *nopTransporterWithActiveTime) Pause() {}
func (s *nopTransporterWithActiveTime) Resume() {}
func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }
1 change: 1 addition & 0 deletions rafthttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (pr *fakePeer) sendSnap(m raftsnap.Message) {

func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) isActive() bool { return true }
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
func (pr *fakePeer) stop() {}
func (pr *fakePeer) Pause() { pr.paused = true }
Expand Down
3 changes: 3 additions & 0 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Peer interface {
// connection hands over to the peer. The peer will close the connection
// when it is no longer used.
attachOutgoingConn(conn *outgoingConn)
// isActive returns true if the connection has become active with this peer.
isActive() bool
// activeSince returns the time that the connection with the
// peer becomes active.
activeSince() time.Time
Expand Down Expand Up @@ -256,6 +258,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
}
}

func (p *peer) isActive() bool { return p.status.isActive() }
func (p *peer) activeSince() time.Time { return p.status.activeSince() }

// Pause pauses the peer. The peer will simply drops all incoming
Expand Down
12 changes: 12 additions & 0 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,17 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
plog.Infof("updated peer %s", id)
}

// IsActive returns true if the peer connection has been
// established, thus active.
func (t *Transport) IsActive(id types.ID) bool {
t.mu.RLock()
defer t.mu.RUnlock()
if p, ok := t.peers[id]; ok {
return p.isActive()
}
return false
}

func (t *Transport) ActiveSince(id types.ID) time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
Expand Down Expand Up @@ -390,6 +401,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) IsActive(id types.ID) bool { return true }
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
Expand Down

0 comments on commit fe9b909

Please sign in to comment.