diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 0b5aec7c..7f06cf97 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -29,6 +29,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + peer "github.com/libp2p/go-libp2p-core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -61,6 +62,74 @@ func BenchmarkRoundtripSuccess(b *testing.B) { b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) { p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm) }) + b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) { + benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + }) +} + +func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes int, df distFunc, tdm *tempDirMaker) { + ctx, cancel := context.WithCancel(ctx) + mn := mocknet.New(ctx) + net := tn.StreamNet(ctx, mn) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + instances, err := ig.Instances(numnodes + 1) + require.NoError(b, err) + var allCids [][]cid.Cid + for i := 0; i < b.N; i++ { + thisCids := df(ctx, b, instances[1:]) + allCids = append(allCids, thisCids) + } + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + + allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + + runtime.GC() + b.ResetTimer() + b.ReportAllocs() + fetcher := instances[0] + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + require.NoError(b, err) + start := time.Now() + for j := 0; j < numnodes; j++ { + instance := instances[j+1] + _, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: allCids[i][j]}, allSelector) + + wg.Add(1) + go func(other peer.ID) { + defer func() { + mn.DisconnectPeers(fetcher.Peer, other) + wg.Done() + }() + for { + select { + case <-ctx.Done(): + return + case err, ok := <-errChan: + if !ok { + return + } + b.Fatalf("received error on request: %s", err.Error()) + } + } + }(instance.Peer) + } + wg.Wait() + result := runStats{ + Time: time.Since(start), + Name: b.Name(), + } + benchmarkLog = append(benchmarkLog, result) + + cancel() + } + cancel() + time.Sleep(100 * time.Millisecond) + b.Logf("Number of running go-routines: %d", runtime.NumGoroutine()) + testinstance.Close(instances) + ig.Close() } func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 87a0434f..9b422b22 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -111,6 +111,8 @@ func (mq *MessageQueue) Shutdown() { } func (mq *MessageQueue) runQueue() { + defer mq.eventPublisher.Shutdown() + mq.eventPublisher.Startup() for { select { case <-mq.outgoingWork: diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index 82b32235..1761d5a3 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -147,6 +147,7 @@ func TestProcessingNotification(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} messageQueue := New(ctx, peer, messageNetwork) + messageQueue.Startup() waitGroup.Add(1) blks := testutil.GenerateBlocksOfSize(3, 128) @@ -164,7 +165,6 @@ func TestProcessingNotification(t *testing.T) { messageQueue.AddResponses(newMessage.Responses(), blks, notifee) // wait for send attempt - messageQueue.Startup() waitGroup.Wait() var message gsmsg.GraphSyncMessage diff --git a/notifications/notifications_test.go b/notifications/notifications_test.go index 43c88467..f901a3de 100644 --- a/notifications/notifications_test.go +++ b/notifications/notifications_test.go @@ -143,7 +143,9 @@ func TestSubscribeOn(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() ps := notifications.NewPublisher() + ps.Startup() testPublisher(ctx, t, ps) + ps.Shutdown() }) } diff --git a/notifications/publisher.go b/notifications/publisher.go index 5c8d2cad..06778c1a 100644 --- a/notifications/publisher.go +++ b/notifications/publisher.go @@ -32,10 +32,13 @@ func NewPublisher() Publisher { cmdChan: make(chan cmd), closed: make(chan struct{}), } - go ps.start() return ps } +func (ps *publisher) Startup() { + go ps.start() +} + // Publish publishes an event for the given message id func (ps *publisher) Publish(topic Topic, event Event) { ps.lk.RLock() diff --git a/notifications/types.go b/notifications/types.go index 8c5c7c6c..e2c29ef2 100644 --- a/notifications/types.go +++ b/notifications/types.go @@ -30,6 +30,7 @@ type Publisher interface { Close(Topic) Publish(Topic, Event) Shutdown() + Startup() Subscribable } diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index e42e39d3..74105ecf 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -416,6 +416,8 @@ func (prs *peerResponseSender) signalWork() { } func (prs *peerResponseSender) run() { + defer prs.publisher.Shutdown() + prs.publisher.Startup() for { select { case <-prs.ctx.Done():