Skip to content

Commit

Permalink
Merge pull request #85 from libp2p/fix-race
Browse files Browse the repository at this point in the history
fix race condition in getConnsToClose
  • Loading branch information
marten-seemann authored Nov 28, 2021
2 parents 0ea56a8 + 4533d5e commit d51cbad
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 36 deletions.
21 changes: 11 additions & 10 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

var SilencePeriod = 10 * time.Second

var minCleanupInterval = 10 * time.Second

var log = logging.Logger("connmgr")

// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
Expand Down Expand Up @@ -247,8 +249,8 @@ func (cm *BasicConnMgr) background() {
if interval < cm.cfg.silencePeriod {
interval = cm.cfg.silencePeriod
}
if interval < 10*time.Second {
interval = 10 * time.Second
if interval < minCleanupInterval {
interval = minCleanupInterval
}

ticker := time.NewTicker(interval)
Expand Down Expand Up @@ -318,15 +320,13 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return nil
}

nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.cfg.lowWater {
if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}

npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
candidates := make([]peerInfo, 0, cm.segments.countPeers())
var ncandidates int
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
Expand All @@ -341,7 +341,9 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
// skip peers in the grace period.
continue
}
candidates = append(candidates, inf)
// note that we're copying the entry here,
// but since inf.conns is a map, it will still point to the original object
candidates = append(candidates, *inf)
ncandidates += len(inf.conns)
}
s.Unlock()
Expand Down Expand Up @@ -381,7 +383,6 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
// lock this to protect from concurrent modifications from connect/disconnect events
s := cm.segments.get(inf.id)
s.Lock()

if len(inf.conns) == 0 && inf.temp {
// handle temporary entries for early tags -- this entry has gone past the grace period
// and still holds no connections, so prune it.
Expand All @@ -390,8 +391,8 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
for c := range inf.conns {
selected = append(selected, c)
}
target -= len(inf.conns)
}
target -= len(inf.conns)
s.Unlock()
}

Expand Down
96 changes: 70 additions & 26 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

detectrace "github.com/ipfs/go-detect-race"

"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -51,6 +53,7 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw
// Make sure multiple trim calls block.
func TestTrimBlocks(t *testing.T) {
cm := NewConnManager(200, 300, 0)
defer cm.Close()

cm.lastTrimMu.RLock()

Expand Down Expand Up @@ -79,6 +82,7 @@ func TestTrimBlocks(t *testing.T) {
func TestTrimCancels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cm := NewConnManager(200, 300, 0)
defer cm.Close()

cm.lastTrimMu.RLock()
defer cm.lastTrimMu.RUnlock()
Expand All @@ -103,6 +107,7 @@ func TestTrimClosed(t *testing.T) {
// Make sure joining an existing trim works.
func TestTrimJoin(t *testing.T) {
cm := NewConnManager(200, 300, 0)
defer cm.Close()
cm.lastTrimMu.RLock()
var wg sync.WaitGroup
wg.Add(3)
Expand All @@ -126,6 +131,7 @@ func TestTrimJoin(t *testing.T) {

func TestConnTrimming(t *testing.T) {
cm := NewConnManager(200, 300, 0)
defer cm.Close()
not := cm.Notifee()

var conns []network.Conn
Expand Down Expand Up @@ -162,39 +168,47 @@ func TestConnTrimming(t *testing.T) {
}

func TestConnsToClose(t *testing.T) {
cm := NewConnManager(0, 10, 0)
conns := cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
}

cm = NewConnManager(10, 0, 0)
conns = cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
}

cm = NewConnManager(1, 1, 0)
conns = cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
addConns := func(cm *BasicConnMgr, n int) {
not := cm.Notifee()
for i := 0; i < n; i++ {
conn := randConn(t, nil)
not.Connected(nil, conn)
}
}

cm = NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
for i := 0; i < 5; i++ {
conn := randConn(t, nil)
not.Connected(nil, conn)
}
conns = cm.getConnsToClose()
if len(conns) != 0 {
t.Fatal("expected no connections")
}
t.Run("below hi limit", func(t *testing.T) {
cm := NewConnManager(0, 10, 0)
defer cm.Close()
addConns(cm, 5)
require.Empty(t, cm.getConnsToClose())
})

t.Run("below low limit", func(t *testing.T) {
cm := NewConnManager(10, 0, 0)
defer cm.Close()
addConns(cm, 5)
require.Empty(t, cm.getConnsToClose())
})

t.Run("below low and hi limit", func(t *testing.T) {
cm := NewConnManager(1, 1, 0)
defer cm.Close()
addConns(cm, 1)
require.Empty(t, cm.getConnsToClose())
})

t.Run("within silence period", func(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
defer cm.Close()
addConns(cm, 1)
require.Empty(t, cm.getConnsToClose())
})
}

func TestGetTagInfo(t *testing.T) {
start := time.Now()
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
defer cm.Close()
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
Expand Down Expand Up @@ -265,6 +279,7 @@ func TestGetTagInfo(t *testing.T) {

func TestTagPeerNonExistant(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
defer cm.Close()

id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 1)
Expand All @@ -276,6 +291,7 @@ func TestTagPeerNonExistant(t *testing.T) {

func TestUntagPeer(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
defer cm.Close()
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
Expand Down Expand Up @@ -307,6 +323,7 @@ func TestGetInfo(t *testing.T) {
start := time.Now()
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
defer cm.Close()
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
Expand Down Expand Up @@ -334,6 +351,7 @@ func TestGetInfo(t *testing.T) {
func TestDoubleConnection(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
defer cm.Close()
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
Expand All @@ -350,6 +368,7 @@ func TestDoubleConnection(t *testing.T) {
func TestDisconnected(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
defer cm.Close()
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
Expand Down Expand Up @@ -387,6 +406,7 @@ func TestGracePeriod(t *testing.T) {

SilencePeriod = 0
cm := NewConnManager(10, 20, 100*time.Millisecond)
defer cm.Close()
SilencePeriod = 10 * time.Second

not := cm.Notifee()
Expand Down Expand Up @@ -444,6 +464,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
}

cm := NewConnManager(10, 20, 0)
defer cm.Close()
not := cm.Notifee()

var conns []network.Conn
Expand Down Expand Up @@ -480,6 +501,7 @@ func TestPeerProtectionSingleTag(t *testing.T) {

SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
defer cm.Close()
SilencePeriod = 10 * time.Second

not := cm.Notifee()
Expand Down Expand Up @@ -567,6 +589,7 @@ func TestPeerProtectionMultipleTags(t *testing.T) {

SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
defer cm.Close()
SilencePeriod = 10 * time.Second

not := cm.Notifee()
Expand Down Expand Up @@ -650,6 +673,7 @@ func TestPeerProtectionMultipleTags(t *testing.T) {
func TestPeerProtectionIdempotent(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(10, 20, 0)
defer cm.Close()
SilencePeriod = 10 * time.Second

id, _ := tu.RandPeerID()
Expand Down Expand Up @@ -681,6 +705,8 @@ func TestPeerProtectionIdempotent(t *testing.T) {

func TestUpsertTag(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
defer cm.Close()

not := cm.Notifee()
conn := randConn(t, nil)
rp := conn.RemotePeer()
Expand Down Expand Up @@ -737,6 +763,7 @@ func TestTemporaryEntriesClearedFirst(t *testing.T) {

func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
cm := NewConnManager(1, 1, 0)
defer cm.Close()

conn := randConn(t, nil)
cm.TagPeer(conn.RemotePeer(), "test", 20)
Expand All @@ -754,3 +781,20 @@ func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
t.Fatal("expected a non-temporary tag with value 20")
}
}

// see https://github.com/libp2p/go-libp2p-connmgr/issues/82
func TestConcurrentCleanupAndTagging(t *testing.T) {
origMinCleanupInterval := minCleanupInterval
t.Cleanup(func() { minCleanupInterval = origMinCleanupInterval })
minCleanupInterval = time.Millisecond

SilencePeriod = 0
cm := NewConnManager(1, 1, 0)
defer cm.Close()
SilencePeriod = 10 * time.Second

for i := 0; i < 1000; i++ {
conn := randConn(t, nil)
cm.TagPeer(conn.RemotePeer(), "test", 20)
}
}
4 changes: 4 additions & 0 deletions p2p/net/connmgr/decay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Moc
if !ok {
tb.Fatalf("connmgr does not support decay")
}
tb.Cleanup(func() {
mgr.Close()
decay.Close()
})

return mgr, decay, mockClock
}

0 comments on commit d51cbad

Please sign in to comment.