diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 7f06cf97..95875c24 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -35,6 +35,7 @@ import ( "github.com/ipfs/go-graphsync/benchmarks/testinstance" tn "github.com/ipfs/go-graphsync/benchmarks/testnet" + graphsync "github.com/ipfs/go-graphsync/impl" ) const stdBlockSize = 8000 @@ -51,19 +52,25 @@ func BenchmarkRoundtripSuccess(b *testing.B) { tdm, err := newTempDirMaker(b) require.NoError(b, err) b.Run("test-20-10000", func(b *testing.B) { - subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) b.Run("test-20-128MB", func(b *testing.B) { - subtestDistributeAndFetch(ctx, b, 10, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) b.Run("test-p2p-stress-10-128MB", func(b *testing.B) { - p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm) + p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<20, 1024, true), tdm, nil, false) }) b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) { - p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm) + p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024, true), tdm, nil, false) + }) + b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) { + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) + }) + b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) { + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) }) b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) { - benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) } @@ -71,7 +78,7 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in ctx, cancel := context.WithCancel(ctx) mn := mocknet.New(ctx) net := tn.StreamNet(ctx, mn) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false) instances, err := ig.Instances(numnodes + 1) require.NoError(b, err) var allCids [][]cid.Cid @@ -132,13 +139,13 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in ig.Close() } -func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { +func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker, options []graphsync.Option, diskBasedDatastore bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() mn := mocknet.New(ctx) mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000}) net := tn.StreamNet(ctx, mn) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, options, tdm, diskBasedDatastore) instances, err := ig.Instances(1 + b.N) require.NoError(b, err) var allCids []cid.Cid @@ -160,32 +167,16 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, ctx, cancel := context.WithTimeout(ctx, 10*time.Second) require.NoError(b, err) start := time.Now() - disconnectOn := rand.Intn(numfiles) for j := 0; j < numfiles; j++ { - resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) + responseChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) wg.Add(1) go func(j int) { defer wg.Done() - results := 0 - for { - select { - case <-ctx.Done(): - return - case <-resultChan: - results++ - if results == 100 && j == disconnectOn { - mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer) - mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer) - time.Sleep(100 * time.Millisecond) - mn.LinkPeers(instances[0].Peer, instances[i+1].Peer) - } - case err, ok := <-errChan: - if !ok { - return - } - b.Fatalf("received error on request: %s", err.Error()) - } + for _ = range responseChan { + } + for err := range errChan { + b.Fatalf("received error on request: %s", err.Error()) } }(j) } @@ -205,7 +196,7 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, ctx, cancel := context.WithCancel(ctx) defer cancel() net := tn.VirtualNetwork(d) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false) instances, err := ig.Instances(numnodes + b.N) require.NoError(b, err) destCids := df(ctx, b, instances[:numnodes]) @@ -268,7 +259,7 @@ type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Insta const defaultUnixfsChunkSize uint64 = 1 << 10 const defaultUnixfsLinksPerLevel = 1024 -func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid { +func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) cid.Cid { data := make([]byte, size) _, err := rand.Read(data) @@ -283,7 +274,7 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block params := ihelper.DagBuilderParams{ Maxlinks: unixfsLinksPerLevel, - RawLeaves: true, + RawLeaves: useRawNodes, CidBuilder: nil, Dagserv: bufferedDS, } @@ -300,11 +291,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block return nd.Cid() } -func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc { +func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc { return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid { cids := make([]cid.Cid, 0, len(provs)) for _, prov := range provs { - c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel) + c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes) cids = append(cids, c) } return cids diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go index b5b48f40..1b4048cd 100644 --- a/benchmarks/testinstance/testinstance.go +++ b/benchmarks/testinstance/testinstance.go @@ -8,6 +8,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" + badgerds "github.com/ipfs/go-ds-badger" blockstore "github.com/ipfs/go-ipfs-blockstore" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -29,26 +30,28 @@ type TempDirGenerator interface { // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator { +func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator, diskBasedDatastore bool) InstanceGenerator { ctx, cancel := context.WithCancel(ctx) return InstanceGenerator{ - net: net, - seq: 0, - ctx: ctx, // TODO take ctx as param to Next, Instances - cancel: cancel, - gsOptions: gsOptions, - tempDirGenerator: tempDirGenerator, + net: net, + seq: 0, + ctx: ctx, // TODO take ctx as param to Next, Instances + cancel: cancel, + gsOptions: gsOptions, + tempDirGenerator: tempDirGenerator, + diskBasedDatastore: diskBasedDatastore, } } // InstanceGenerator generates new test instances of bitswap+dependencies type InstanceGenerator struct { - seq int - net tn.Network - ctx context.Context - cancel context.CancelFunc - gsOptions []gsimpl.Option - tempDirGenerator TempDirGenerator + seq int + net tn.Network + ctx context.Context + cancel context.CancelFunc + gsOptions []gsimpl.Option + tempDirGenerator TempDirGenerator + diskBasedDatastore bool } // Close closes the clobal context, shutting down all test instances @@ -64,7 +67,7 @@ func (g *InstanceGenerator) Next() (Instance, error) { if err != nil { return Instance{}, err } - return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir()) + return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir(), g.diskBasedDatastore) } // Instances creates N test instances of bitswap + dependencies and connects @@ -138,11 +141,23 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) { +func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string, diskBasedDatastore bool) (Instance, error) { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) - dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + var dstore datastore.Batching + var err error + if diskBasedDatastore { + defopts := badgerds.DefaultOptions + defopts.SyncWrites = false + defopts.Truncate = true + dstore, err = badgerds.NewDatastore(tempDir, &defopts) + if err != nil { + return Instance{}, err + } + } else { + dstore = ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + } bstore, err := blockstore.CachedBlockstore(ctx, blockstore.NewBlockstore(dstore), blockstore.DefaultCacheOpts()) diff --git a/go.mod b/go.mod index 05fbe725..19fabf23 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,14 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.6 github.com/ipfs/go-datastore v0.4.4 + github.com/ipfs/go-ds-badger v0.2.1 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-delay v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 + github.com/ipfs/go-ipfs-pq v0.0.2 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-ipld-cbor v0.0.4 // indirect @@ -32,6 +34,7 @@ require ( github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-core v0.5.0 github.com/libp2p/go-libp2p-netutil v0.1.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-record v0.1.1 // indirect github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-msgio v0.0.6 diff --git a/go.sum b/go.sum index 91bdb59e..260b4e07 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -168,6 +169,7 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s= +github.com/ipfs/go-ds-badger v0.2.1 h1:RsC9DDlwFhFdfT+s2PeC8joxbSp2YMufK8w/RBOxKtk= github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= diff --git a/impl/graphsync.go b/impl/graphsync.go index 85398f3d..502ee4f7 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-graphsync/requestmanager/asyncloader" requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/responsemanager" + "github.com/ipfs/go-graphsync/responsemanager/allocator" responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" "github.com/ipfs/go-graphsync/responsemanager/persistenceoptions" @@ -26,6 +27,8 @@ import ( var log = logging.Logger("graphsync") const maxRecursionDepth = 100 +const defaultTotalMaxMemory = uint64(256 << 20) +const defaultMaxMemoryPerPeer = uint64(16 << 20) // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol. @@ -53,6 +56,9 @@ type GraphSync struct { ctx context.Context cancel context.CancelFunc unregisterDefaultValidator graphsync.UnregisterHookFunc + allocator *allocator.Allocator + totalMaxMemory uint64 + maxMemoryPerPeer uint64 } // Option defines the functional option type that can be used to configure @@ -67,6 +73,18 @@ func RejectAllRequestsByDefault() Option { } } +func MaxMemoryResponder(totalMaxMemory uint64) Option { + return func(gs *GraphSync) { + gs.totalMaxMemory = totalMaxMemory + } +} + +func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option { + return func(gs *GraphSync) { + gs.maxMemoryPerPeer = maxMemoryPerPeer + } +} + // New creates a new GraphSync Exchange on the given network, // and the given link loader+storer. func New(parent context.Context, network gsnet.GraphSyncNetwork, @@ -83,10 +101,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingBlockHooks := requestorhooks.NewBlockHooks() requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks) peerTaskQueue := peertaskqueue.New() - createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { - return peerresponsemanager.NewResponseSender(ctx, p, peerManager) - } - peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + persistenceOptions := persistenceoptions.New() incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() @@ -95,7 +110,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() blockSentListeners := responderhooks.NewBlockSentListeners() networkErrorListeners := responderhooks.NewNetworkErrorListeners() - responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, @@ -116,8 +130,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, outgoingRequestHooks: outgoingRequestHooks, incomingBlockHooks: incomingBlockHooks, peerTaskQueue: peerTaskQueue, - peerResponseManager: peerResponseManager, - responseManager: responseManager, + totalMaxMemory: defaultTotalMaxMemory, + maxMemoryPerPeer: defaultMaxMemoryPerPeer, ctx: ctx, cancel: cancel, unregisterDefaultValidator: unregisterDefaultValidator, @@ -126,6 +140,15 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, for _, option := range options { option(graphSync) } + allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer) + graphSync.allocator = allocator + createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { + return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator) + } + peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + graphSync.peerResponseManager = peerResponseManager + responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) + graphSync.responseManager = responseManager asyncLoader.Startup() requestManager.SetDelegate(peerManager) diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go new file mode 100644 index 00000000..1690542c --- /dev/null +++ b/responsemanager/allocator/allocator.go @@ -0,0 +1,174 @@ +package allocator + +import ( + "errors" + "sync" + + pq "github.com/ipfs/go-ipfs-pq" + peer "github.com/libp2p/go-libp2p-peer" +) + +type Allocator struct { + totalMemoryMax uint64 + perPeerMax uint64 + + allocLk sync.Mutex + total uint64 + nextAllocIndex uint64 + peerStatuses map[peer.ID]*peerStatus + peerStatusQueue pq.PQ +} + +func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator { + return &Allocator{ + totalMemoryMax: totalMemoryMax, + perPeerMax: perPeerMax, + total: 0, + peerStatuses: make(map[peer.ID]*peerStatus), + peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)), + } +} + +func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { + responseChan := make(chan error, 1) + a.allocLk.Lock() + defer a.allocLk.Unlock() + + status, ok := a.peerStatuses[p] + if !ok { + status = &peerStatus{ + p: p, + totalAllocated: 0, + } + a.peerStatusQueue.Push(status) + a.peerStatuses[p] = status + } + + if (a.total+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { + a.total += amount + status.totalAllocated += amount + responseChan <- nil + } else { + pendingAllocation := pendingAllocation{p, amount, responseChan, a.nextAllocIndex} + a.nextAllocIndex++ + status.pendingAllocations = append(status.pendingAllocations, pendingAllocation) + } + a.peerStatusQueue.Update(status.Index()) + return responseChan +} + +func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { + a.allocLk.Lock() + defer a.allocLk.Unlock() + + status, ok := a.peerStatuses[p] + if !ok { + return errors.New("cannot deallocate from peer with no allocations") + } + status.totalAllocated -= amount + a.total -= amount + a.peerStatusQueue.Update(status.Index()) + a.processPendingAllocations() + return nil +} + +func (a *Allocator) ReleasePeerMemory(p peer.ID) error { + a.allocLk.Lock() + defer a.allocLk.Unlock() + status, ok := a.peerStatuses[p] + if !ok { + return errors.New("cannot deallocate peer with no allocations") + } + a.peerStatusQueue.Remove(status.Index()) + for _, pendingAllocation := range status.pendingAllocations { + pendingAllocation.response <- errors.New("Peer has been deallocated") + } + a.total -= status.totalAllocated + a.processPendingAllocations() + return nil +} + +func (a *Allocator) processPendingAllocations() { + for a.peerStatusQueue.Len() > 0 { + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + + if len(nextPeer.pendingAllocations) > 0 { + if !a.processNextPendingAllocationForPeer(nextPeer) { + return + } + a.peerStatusQueue.Update(nextPeer.Index()) + } else { + if nextPeer.totalAllocated > 0 { + return + } + a.peerStatusQueue.Pop() + target := nextPeer.p + delete(a.peerStatuses, target) + } + } +} + +func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool { + pendingAllocation := nextPeer.pendingAllocations[0] + if a.total+pendingAllocation.amount > a.totalMemoryMax { + return false + } + if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax { + return false + } + a.total += pendingAllocation.amount + nextPeer.totalAllocated += pendingAllocation.amount + nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] + pendingAllocation.response <- nil + return true +} + +type peerStatus struct { + p peer.ID + totalAllocated uint64 + index int + pendingAllocations []pendingAllocation +} + +type pendingAllocation struct { + p peer.ID + amount uint64 + response chan error + allocIndex uint64 +} + +// SetIndex stores the int index. +func (ps *peerStatus) SetIndex(index int) { + ps.index = index +} + +// Index returns the last given by SetIndex(int). +func (ps *peerStatus) Index() int { + return ps.index +} + +func makePeerStatusCompare(maxPerPeer uint64) pq.ElemComparator { + return func(a, b pq.Elem) bool { + pa := a.(*peerStatus) + pb := b.(*peerStatus) + if len(pa.pendingAllocations) == 0 { + if len(pb.pendingAllocations) == 0 { + return pa.totalAllocated < pb.totalAllocated + } + return false + } + if len(pb.pendingAllocations) == 0 { + return true + } + if pa.totalAllocated+pa.pendingAllocations[0].amount > maxPerPeer { + return false + } + if pb.totalAllocated+pb.pendingAllocations[0].amount > maxPerPeer { + return true + } + if pa.pendingAllocations[0].allocIndex < pb.pendingAllocations[0].allocIndex { + return true + } + return false + } +} diff --git a/responsemanager/allocator/allocator_test.go b/responsemanager/allocator/allocator_test.go new file mode 100644 index 00000000..556dde0b --- /dev/null +++ b/responsemanager/allocator/allocator_test.go @@ -0,0 +1,245 @@ +package allocator_test + +import ( + "testing" + + "github.com/ipfs/go-graphsync/responsemanager/allocator" + "github.com/ipfs/go-graphsync/testutil" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAllocator(t *testing.T) { + peers := testutil.GeneratePeers(3) + testCases := map[string]struct { + total uint64 + maxPerPeer uint64 + allocs []alloc + totals []map[peer.ID]uint64 + }{ + "single peer against total": { + total: 1000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "single peer against self limit": { + total: 2000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "multiple peers against total": { + total: 2000, + maxPerPeer: 2000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 500, peers[1]: 1300}, + {peers[0]: 500, peers[1]: 800}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against self limit": { + total: 5000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 400}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against mix of limits": { + total: 2700, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 800, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[2], 1000, false}, + {peers[2], 300, false}, + {peers[0], 200, true}, + {peers[2], 200, true}, + {peers[2], 100, false}, + {peers[1], 200, true}, + {peers[2], 100, true}, + {peers[1], 100, true}, + {peers[2], 200, true}, + {peers[0], 200, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 800}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 700}, + {peers[0]: 900, peers[1]: 700, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 800}, + {peers[0]: 900, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 900}, + }, + }, + "multiple peers, peer drops off": { + total: 2000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 500, false}, + {peers[2], 500, false}, + {peers[1], 100, false}, + {peers[2], 100, false}, + {peers[2], 200, false}, + {peers[1], 200, false}, + {peers[2], 100, false}, + {peers[1], 300, false}, + // free peer 0 completely + {peers[0], 0, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 500}, + {peers[0]: 1000, peers[1]: 500, peers[2]: 500}, + {peers[0]: 0, peers[1]: 500, peers[2]: 500}, + {peers[0]: 0, peers[1]: 800, peers[2]: 900}, + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + allocator := allocator.NewAllocator(data.total, data.maxPerPeer) + totals := map[peer.ID]uint64{} + currentTotal := 0 + var pending []pendingResult + for _, alloc := range data.allocs { + var changedTotals bool + pending, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + if alloc.isDealloc { + if alloc.amount == 0 { + err := allocator.ReleasePeerMemory(alloc.p) + assert.NoError(t, err) + totals[alloc.p] = 0 + } else { + err := allocator.ReleaseBlockMemory(alloc.p, alloc.amount) + assert.NoError(t, err) + totals[alloc.p] = totals[alloc.p] - alloc.amount + } + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } else { + allocated := allocator.AllocateBlockMemory(alloc.p, alloc.amount) + select { + case <-allocated: + totals[alloc.p] = totals[alloc.p] + alloc.amount + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + default: + pending = append(pending, pendingResult{alloc.p, alloc.amount, allocated}) + } + } + } + var changedTotals bool + _, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + require.Equal(t, len(data.totals), currentTotal) + }) + } +} + +func readPending(t *testing.T, pending []pendingResult, totals map[peer.ID]uint64) ([]pendingResult, bool) { + morePending := true + changedTotals := false + for morePending && len(pending) > 0 { + morePending = false + doneIter: + for i, next := range pending { + select { + case err := <-next.response: + require.NoError(t, err) + copy(pending[i:], pending[i+1:]) + pending[len(pending)-1] = pendingResult{} + pending = pending[:len(pending)-1] + totals[next.p] = totals[next.p] + next.amount + changedTotals = true + morePending = true + break doneIter + default: + } + } + } + return pending, changedTotals +} + +// amount 0 + isDealloc = true shuts down the whole peer +type alloc struct { + p peer.ID + amount uint64 + isDealloc bool +} + +type pendingResult struct { + p peer.ID + amount uint64 + response <-chan error +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 74105ecf..a679fcc6 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -49,6 +49,13 @@ type PeerMessageHandler interface { SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block, ...notifications.Notifee) } +// Allocator is an interface that can manage memory allocated for blocks +type Allocator interface { + AllocateBlockMemory(p peer.ID, amount uint64) <-chan error + ReleasePeerMemory(p peer.ID) error + ReleaseBlockMemory(p peer.ID, amount uint64) error +} + // Transaction is a series of operations that should be send together in a single response type Transaction func(PeerResponseTransactionSender) error @@ -57,18 +64,20 @@ type peerResponseSender struct { ctx context.Context cancel context.CancelFunc peerHandler PeerMessageHandler + allocator Allocator outgoingWork chan struct{} - linkTrackerLk sync.RWMutex - linkTracker *linktracker.LinkTracker - altTrackers map[string]*linktracker.LinkTracker - dedupKeys map[graphsync.RequestID]string - responseBuildersLk sync.RWMutex - responseBuilders []*responsebuilder.ResponseBuilder - nextBuilderTopic responsebuilder.Topic - queuedMessages chan responsebuilder.Topic - subscriber notifications.MappableSubscriber - publisher notifications.Publisher + linkTrackerLk sync.RWMutex + linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string + responseBuildersLk sync.RWMutex + responseBuilders []*responsebuilder.ResponseBuilder + nextBuilderTopic responsebuilder.Topic + queuedMessages chan responsebuilder.Topic + subscriber notifications.MappableSubscriber + allocatorSubscriber notifications.MappableSubscriber + publisher notifications.Publisher } // PeerResponseSender handles batching, deduping, and sending responses for @@ -109,7 +118,7 @@ type PeerResponseTransactionSender interface { // NewResponseSender generates a new PeerResponseSender for the given context, peer ID, // using the given peer message handler. -func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender { +func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler, allocator Allocator) PeerResponseSender { ctx, cancel := context.WithCancel(ctx) prs := &peerResponseSender{ p: p, @@ -122,8 +131,10 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa altTrackers: make(map[string]*linktracker.LinkTracker), queuedMessages: make(chan responsebuilder.Topic, 1), publisher: notifications.NewPublisher(), + allocator: allocator, } prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform) + prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform) return prs } @@ -383,6 +394,13 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { } func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { + if blkSize > 0 { + select { + case <-prs.allocator.AllocateBlockMemory(prs.p, blkSize): + case <-prs.ctx.Done(): + return false + } + } prs.responseBuildersLk.Lock() defer prs.responseBuildersLk.Unlock() if shouldBeginNewResponse(prs.responseBuilders, blkSize) { @@ -416,7 +434,10 @@ func (prs *peerResponseSender) signalWork() { } func (prs *peerResponseSender) run() { - defer prs.publisher.Shutdown() + defer func() { + prs.publisher.Shutdown() + prs.allocator.ReleasePeerMemory(prs.p) + }() prs.publisher.Startup() for { select { @@ -438,6 +459,10 @@ func (prs *peerResponseSender) sendResponseMessages() { if builder.Empty() { continue } + notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{ + Topic: builder.BlockSize(), + Subscriber: prs.allocatorSubscriber, + }) responses, blks, err := builder.Build() if err != nil { log.Errorf("Unable to assemble GraphSync response: %s", err.Error()) @@ -495,3 +520,22 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event func (s *subscriber) OnClose(topic notifications.Topic) { s.prs.publisher.Close(topic) } + +type allocatorSubscriber struct { + prs *peerResponseSender +} + +func (as *allocatorSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + blkSize, ok := topic.(uint64) + if !ok { + return + } + _, ok = event.(Event) + if !ok { + return + } + _ = as.prs.allocator.ReleaseBlockMemory(as.prs.p, blkSize) +} + +func (as *allocatorSubscriber) OnClose(topic notifications.Topic) { +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index ecdf91e8..0fce6181 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -18,6 +18,7 @@ import ( gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/allocator" "github.com/ipfs/go-graphsync/testutil" ) @@ -41,7 +42,8 @@ func TestPeerResponseSenderSendsResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData(), sendResponseNotifee1) @@ -125,7 +127,8 @@ func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -185,7 +188,8 @@ func TestPeerResponseSenderSendsExtensionData(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -228,7 +232,8 @@ func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() notifee, notifeeVerifier := testutil.NewTestNotifee("transaction", 10) err := peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { @@ -270,7 +275,8 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.IgnoreBlocks(requestID1, links) @@ -326,7 +332,8 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(1<<30, 1<<30) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.DedupKey(requestID1, "applesauce") @@ -382,6 +389,64 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { } +func TestPeerResponseSenderSendsResponsesMemoryPressure(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + fph := newFakePeerHandler(ctx, t) + allocator := allocator.NewAllocator(300, 300) + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) + peerResponseSender.Startup() + + bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) + assertSentOnWire(t, bd, blks[0]) + fph.AssertHasMessage("did not send first message") + + fph.AssertBlocks(blks[0]) + fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse}) + + finishes := make(chan string, 2) + go func() { + _ = peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { + bd = peerResponseSender.SendResponse(links[1], blks[1].RawData()) + assertSentOnWire(t, bd, blks[1]) + bd = peerResponseSender.SendResponse(links[2], blks[2].RawData()) + assertSentOnWire(t, bd, blks[2]) + bd = peerResponseSender.SendResponse(links[3], blks[3].RawData()) + assertSentOnWire(t, bd, blks[3]) + peerResponseSender.FinishRequest() + return nil + }) + finishes <- "sent message" + }() + go func() { + time.Sleep(100 * time.Millisecond) + // let peer reponse manager know last message was sent so message sending can continue + finishes <- "freed memory" + fph.notifySuccess() + }() + + var finishMessages []string + for i := 0; i < 2; i++ { + var finishMessage string + testutil.AssertReceive(ctx, t, finishes, &finishMessage, "should have completed") + finishMessages = append(finishMessages, finishMessage) + } + require.Equal(t, []string{"freed memory", "sent message"}, finishMessages) + fph.AssertHasMessage("did not send second message") + fph.AssertBlocks(blks[1], blks[2], blks[3]) + fph.AssertResponses(expectedResponses{ + requestID1: graphsync.RequestCompletedFull, + }) +} + func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { for _, response := range responses { if response.RequestID() == requestID {