From fe9b90929983b595c658a74773f6ef8f38b48b8e Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 26 Feb 2018 13:37:47 -0800 Subject: [PATCH] rafthttp: add "IsActive" methods to "Peer" Signed-off-by: Gyuho Lee --- etcdserver/util_test.go | 13 ++++++++----- rafthttp/http_test.go | 1 + rafthttp/peer.go | 3 +++ rafthttp/transport.go | 12 ++++++++++++ 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/etcdserver/util_test.go b/etcdserver/util_test.go index e0b75454c383..d99bcc1edaac 100644 --- a/etcdserver/util_test.go +++ b/etcdserver/util_test.go @@ -82,8 +82,11 @@ func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {} func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {} func (s *nopTransporterWithActiveTime) RemoveAllPeers() {} func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {} -func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] } -func (s *nopTransporterWithActiveTime) Stop() {} -func (s *nopTransporterWithActiveTime) Pause() {} -func (s *nopTransporterWithActiveTime) Resume() {} -func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am } +func (s *nopTransporterWithActiveTime) IsActive(id types.ID) bool { + return s.activeMap[id] != time.Time{} +} +func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] } +func (s *nopTransporterWithActiveTime) Stop() {} +func (s *nopTransporterWithActiveTime) Pause() {} +func (s *nopTransporterWithActiveTime) Resume() {} +func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am } diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index d3ae81104546..6da4e24843ac 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -386,6 +386,7 @@ func (pr *fakePeer) sendSnap(m raftsnap.Message) { func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } +func (pr *fakePeer) isActive() bool { return true } func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) stop() {} func (pr *fakePeer) Pause() { pr.paused = true } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 58b51f034940..2692d36fddbd 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -73,6 +73,8 @@ type Peer interface { // connection hands over to the peer. The peer will close the connection // when it is no longer used. attachOutgoingConn(conn *outgoingConn) + // isActive returns true if the connection has become active with this peer. + isActive() bool // activeSince returns the time that the connection with the // peer becomes active. activeSince() time.Time @@ -256,6 +258,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { } } +func (p *peer) isActive() bool { return p.status.isActive() } func (p *peer) activeSince() time.Time { return p.status.activeSince() } // Pause pauses the peer. The peer will simply drops all incoming diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 938482517aea..b759ed481330 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -337,6 +337,17 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { plog.Infof("updated peer %s", id) } +// IsActive returns true if the peer connection has been +// established, thus active. +func (t *Transport) IsActive(id types.ID) bool { + t.mu.RLock() + defer t.mu.RUnlock() + if p, ok := t.peers[id]; ok { + return p.isActive() + } + return false +} + func (t *Transport) ActiveSince(id types.ID) time.Time { t.mu.RLock() defer t.mu.RUnlock() @@ -390,6 +401,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) IsActive(id types.ID) bool { return true } func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {}