Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connmgr: prefer peers with no streams when closing connections #1675

Merged
merged 2 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,11 @@ type peerInfo struct {

type peerInfos []peerInfo

func (p peerInfos) SortByValue() {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
}

func (p peerInfos) SortByValueAndStreams() {
// SortByValueAndStreams sorts peerInfos by their value and stream count. It
// will sort peers with no streams before those with streams (all else being
// equal). If `sortByMoreStreams` is true it will sort peers with more streams
// before those with fewer streams. This is useful to prioritize freeing memory.
func (p peerInfos) SortByValueAndStreams(sortByMoreStreams bool) {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
Expand All @@ -278,12 +270,21 @@ func (p peerInfos) SortByValueAndStreams() {
}
leftIncoming, leftStreams := incomingAndStreams(left.conns)
rightIncoming, rightStreams := incomingAndStreams(right.conns)
// prefer closing inactive connections (no streams open)
if rightStreams != leftStreams && (leftStreams == 0 || rightStreams == 0) {
return leftStreams < rightStreams
}
// incoming connections are preferred for pruning
if leftIncoming != rightIncoming {
return leftIncoming
}
// prune connections with a higher number of streams first
return rightStreams < leftStreams

if sortByMoreStreams {
// prune connections with a higher number of streams first
return rightStreams < leftStreams
} else {
return leftStreams < rightStreams
}
})
}

Expand Down Expand Up @@ -368,7 +369,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
cm.plk.RUnlock()

// Sort peers according to their value.
candidates.SortByValueAndStreams()
candidates.SortByValueAndStreams(true)

selected := make([]network.Conn, 0, target+10)
for _, inf := range candidates {
Expand Down Expand Up @@ -398,7 +399,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
}
cm.plk.RUnlock()

candidates.SortByValueAndStreams()
candidates.SortByValueAndStreams(true)
for _, inf := range candidates {
if target <= 0 {
break
Expand Down Expand Up @@ -459,7 +460,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}

// Sort peers according to their value.
candidates.SortByValue()
candidates.SortByValueAndStreams(false)

target := ncandidates - cm.cfg.lowWater

Expand Down
61 changes: 53 additions & 8 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func (c *tconn) RemotePeer() peer.ID {
return c.peer
}

func (c *tconn) Stat() network.ConnStats {
return network.ConnStats{
Stats: network.Stats{
Direction: network.DirOutbound,
},
NumStreams: 1,
}
}

func (c *tconn) RemoteMultiaddr() ma.Multiaddr {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
if err != nil {
Expand Down Expand Up @@ -802,39 +811,75 @@ func TestPeerInfoSorting(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1")}
p2 := peerInfo{id: peer.ID("peer2"), temp: true}
pis := peerInfos{p1, p2}
pis.SortByValue()
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("starts with low-value connections", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1"), value: 40}
p2 := peerInfo{id: peer.ID("peer2"), value: 20}
pis := peerInfos{p1, p2}
pis.SortByValue()
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("in a memory emergency, starts with incoming connections", func(t *testing.T) {
t.Run("prefer peers with no streams", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 0}}: time.Now(),
},
}
p2 := peerInfo{id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 1}}: time.Now(),
},
}
pis := peerInfos{p2, p1}
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p1, p2})
})

t.Run("in a memory emergency, starts with incoming connections and higher streams", func(t *testing.T) {
incoming := network.ConnStats{}
incoming.Direction = network.DirInbound
outgoing := network.ConnStats{}
outgoing.Direction = network.DirOutbound

outgoingSomeStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 1}
outgoingMoreStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 2}
p1 := peerInfo{
id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
&mockConn{stats: outgoingSomeStreams}: time.Now(),
},
}
p2 := peerInfo{
id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoingSomeStreams}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
p3 := peerInfo{
id: peer.ID("peer3"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
require.Equal(t, pis, peerInfos{p2, p1})
p4 := peerInfo{
id: peer.ID("peer4"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoingMoreStreams}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
pis := peerInfos{p1, p2, p3, p4}
pis.SortByValueAndStreams(true)
// p3 is first because it is inactive (no streams).
// p4 is second because it has the most streams and we priortize killing
// connections with the higher number of streams.
require.Equal(t, pis, peerInfos{p3, p4, p2, p1})
})

t.Run("in a memory emergency, starts with connections that have many streams", func(t *testing.T) {
Expand All @@ -852,7 +897,7 @@ func TestPeerInfoSorting(t *testing.T) {
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
pis.SortByValueAndStreams(true)
require.Equal(t, pis, peerInfos{p2, p1})
})
}