diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 0bf02a8e..54a1e9ff 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -161,16 +161,22 @@ func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) { // CleanupRequest indicates the given request is complete on the client side, // and no further attempts will be made to load links for this request, // so any cached response data is invalid can be cleaned -func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) { +func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) { al.stateLk.Lock() defer al.stateLk.Unlock() + responseCache := al.responseCache aq, ok := al.requestQueues[requestID] if ok { - al.alternateQueues[aq].responseCache.FinishRequest(requestID) + responseCache = al.alternateQueues[aq].responseCache delete(al.requestQueues, requestID) - return } - al.responseCache.FinishRequest(requestID) + toFree := responseCache.FinishRequest(requestID) + if toFree > 0 { + err := al.allocator.ReleaseBlockMemory(p, toFree) + if err != nil { + log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err) + } + } } func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue { diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 5349a805..941554a6 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -219,7 +219,7 @@ func TestRegisterUnregister(t *testing.T) { err = asyncLoader.UnregisterPersistenceOption("other") require.EqualError(t, err, "cannot unregister while requests are in progress") asyncLoader.CompleteResponsesFor(requestID2) - asyncLoader.CleanupRequest(requestID2) + asyncLoader.CleanupRequest(p, requestID2) err = asyncLoader.UnregisterPersistenceOption("other") require.NoError(t, err) diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index 0567cf57..d1d0b57b 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -18,7 +18,7 @@ var log = logging.Logger("graphsync") // UnverifiedBlockStore is an interface for storing blocks // as they come in and removing them as they are verified type UnverifiedBlockStore interface { - PruneBlocks(func(ipld.Link) bool) + PruneBlocks(func(ipld.Link, uint64) bool) PruneBlock(ipld.Link) VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error) AddUnverifiedBlock(ipld.Link, []byte) @@ -42,15 +42,22 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache { } // FinishRequest indicate there is no more need to track blocks tied to this -// response -func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) { +// response. It returns the total number of bytes in blocks that were being +// tracked but are no longer in memory +func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 { rc.responseCacheLk.Lock() rc.linkTracker.FinishRequest(requestID) - rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link) bool { - return rc.linkTracker.BlockRefCount(link) == 0 + toFree := uint64(0) + rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool { + shouldPrune := rc.linkTracker.BlockRefCount(link) == 0 + if shouldPrune { + toFree += amt + } + return shouldPrune }) rc.responseCacheLk.Unlock() + return toFree } // AttemptLoad attempts to laod the given block from the cache diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index dad3fc31..aecf1aea 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -23,9 +23,9 @@ func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []by ubs.inMemoryBlocks[lnk] = data } -func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) { - for link := range ubs.inMemoryBlocks { - if shouldPrune(link) { +func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) { + for link, data := range ubs.inMemoryBlocks { + if shouldPrune(link, uint64(len(data))) { delete(ubs.inMemoryBlocks, link) } } @@ -134,14 +134,16 @@ func TestResponseCacheManagingLinks(t *testing.T) { require.NoError(t, err) require.Nil(t, data, "no data should be returned for unknown block") - responseCache.FinishRequest(requestID1) + toFree := responseCache.FinishRequest(requestID1) // should remove only block 0, since it now has no refering outstanding requests require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0]) + require.Equal(t, toFree, uint64(len(blks[0].RawData()))) responseCache.FinishRequest(requestID2) // should remove last block since are no remaining references require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3]) + require.Equal(t, toFree, uint64(len(blks[3].RawData()))) } diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index 3e600be0..d1ecd448 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -40,9 +40,9 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) // PruneBlocks removes blocks from the unverified store without committing them, // if the passed in function returns true for the given link -func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) { +func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) { for link, data := range ubs.inMemoryBlocks { - if shouldPrune(link) { + if shouldPrune(link, uint64(len(data))) { delete(ubs.inMemoryBlocks, link) ubs.dataSize = ubs.dataSize - uint64(len(data)) } diff --git a/requestmanager/client.go b/requestmanager/client.go index 758270fa..92a3687e 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -82,7 +82,7 @@ type AsyncLoader interface { blks []blocks.Block) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) - CleanupRequest(requestID graphsync.RequestID) + CleanupRequest(p peer.ID, requestID graphsync.RequestID) } // RequestManager tracks outgoing requests and processes incoming reponses diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index e0cc8ed4..16e0b3f7 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -813,7 +813,7 @@ func TestPauseResume(t *testing.T) { // verify no further responses come through time.Sleep(100 * time.Millisecond) testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused") - td.fal.CleanupRequest(rr.gsr.ID()) + td.fal.CleanupRequest(peers[0], rr.gsr.ID()) // unpause err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2) @@ -893,7 +893,7 @@ func TestPauseResumeExternal(t *testing.T) { // verify no further responses come through time.Sleep(100 * time.Millisecond) testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused") - td.fal.CleanupRequest(rr.gsr.ID()) + td.fal.CleanupRequest(peers[0], rr.gsr.ID()) // unpause err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2) diff --git a/requestmanager/server.go b/requestmanager/server.go index b811c8c0..2f50d91b 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -168,7 +168,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i rm.connManager.Unprotect(ipr.p, requestID.Tag()) delete(rm.inProgressRequestStatuses, requestID) ipr.cancelFn() - rm.asyncLoader.CleanupRequest(requestID) + rm.asyncLoader.CleanupRequest(ipr.p, requestID) if ipr.traverser != nil { ipr.traverserCancel() ipr.traverser.Shutdown(rm.ctx) diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index 557e9d64..3d896ed8 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -132,7 +132,7 @@ func (fal *FakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) // CleanupRequest simulates the effect of cleaning up the request by removing any response channels // for the request -func (fal *FakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) { +func (fal *FakeAsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) { fal.responseChannelsLk.Lock() for key := range fal.responseChannels { if key.requestID == requestID {